Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

React is jumbling up a websocket stream of data #247

Open
Bryson14 opened this issue Oct 10, 2024 · 9 comments
Open

React is jumbling up a websocket stream of data #247

Bryson14 opened this issue Oct 10, 2024 · 9 comments

Comments

@Bryson14
Copy link

Hello, I'm seeking help on why data is being jumbled, missed, and dropped when rendering from a websocket. I want to try making a ai chatbot for fun and learn about how websockets and SSE work. I have been banging my head for a few hours now because data is not appearing at it should.

Example

I have a fast api and is streaming data from openai api and sending it to a website. i have confirmed with backend logs and with the firefox websocket network tab that the data is getting to the website in order and all intact.

For example:

Q: what is your favorite color?

Expected A: "I don't have personal preferences, but I can help you find information about colors or suggest color schemes if you need assistance with that!"

Rendered A: "I have preferences but I help you find about or color schemes you need with"

Q: Tell me a joke

Expected A: "Sure, here's a joke for you:\n\nWhy couldn't the bicycle stand up by itself?\n\nBecause it was two tired!"

Rendered A: "Sure's for couldn stand by it was"

Code

import useWebSocket, { ReadyState } from "react-use-websocket";

export const ChatApiProvider: React.FC<ChatApiProviderProps> = ({
  children,
}) => { 
 const [messageHistory, setMessageHistory] = useState<ChatMessage[]>([]);
  const [isGenerating, setIsGenerating] = useState(false);
  const [input, setInput] = useState("");
  const [firstTokenReceived, setFirstTokenReceived] = useState(false);
  const messageBufferRef = useRef<string>('');
  const [editingMessage, setEditingMessage] = useState<ChatMessage | undefined>(
    undefined
  );
  const { toast } = useToast();

  const {
    sendMessage: sendWebSocketMessage,
    lastMessage,
    readyState,
  } = useWebSocket(WEBSOCKET_URL);

  /**
   * Responds to new data on the socket
   */
  useEffect(() => {
    if (lastMessage !== null) {
      try {
        const res = JSON.parse(lastMessage.data);
        handleWebSocketMessage(res);
      } catch (error) {
        console.error("Error parsing message:", error);
        setIsGenerating(false);
        toast({
          title: "Error",
          description: "Error parsing message from server",
        });
      }
    }
  }, [lastMessage]);

  /**
   * Handles each type of message on the socket
   * u/param res json message from the backend
   */
  const handleWebSocketMessage = (res: any): void => {
    switch (res?.type) {
      case "content":
        handleContentMessage(res.data);
        break;
      case "summary":
        handleSummaryMessage(res.data);
        break;
      case "finished":
        handleFinishedMessage();
        break;
      case "error":
        handleErrorMessage(res.data);
        break;
      default:
        console.warn("Unknown message type:", res?.type);
    }
  };

  const handleContentMessage = (content: string) => {
    setFirstTokenReceived(true);
    messageBufferRef.current += content;
    updateMessageHistory(messageBufferRef.current, "assistant", false);
  };

  const handleSummaryMessage = (summary: string) => {
    console.log("Getting summary");
    updateMessageHistory(summary, "assistant", true);
  };

  const handleFinishedMessage = () => {
    setFirstTokenReceived(false);
    setIsGenerating(false);
    messageBufferRef.current = "";
  };

  const handleErrorMessage = (errorMessage: string) => {
    console.error("Error from server:", errorMessage);
    setIsGenerating(false);
    toast({
      title: "Error",
      description: errorMessage,
    });
  };

  const updateMessageHistory = (
    message: string,
    role: "assistant" | "user",
    isNewMessage: boolean
  ) => {
    setMessageHistory((prev) => {
      const newHistory = [...prev];
      if (!isNewMessage && newHistory.length > 0) {
        const lastMessage = newHistory[newHistory.length - 1];
        if (lastMessage.role === role) {
          lastMessage.message = message;
          return newHistory;
        }
      }
      newHistory.push({
        id: uid(),
        message: message,
        role: role,
        date: new Date(),
      });
      return newHistory;
    });
  };

I'm just streaming data token by token from the open api chat in a content payload. then i tried to fix it by adding a summary payload which sends the entire message once it's all done. But that is not helping.

@robtaussig
Copy link
Owner

Can you give me an example of your top level application specifically how/where you are using ChatApiProvider? Based just off of your example:

Q: what is your favorite color?

Expected A: "I don't have personal preferences, but I can help you find information about colors or suggest color schemes if you need assistance with that!"

Rendered A: "I have preferences but I help you find about or color schemes you need with"

It looks to me like you are asking the bot the same question twice? Though it's also strange that in both of your examples, the "actual" rendered answer has typos/incorrect grammar...

There's also a few things that stand out to me in your code:

  1. In your useEffect, you call handleWebSocketMessage (which itself calls/accesses a lot of functions/state) without including it and them in your dependency array. This introduces a lot of possible bugs involving closures and stale references.

  2. I see where you are coming from with needing something like a messageBufferRef.current, but this is a bit of a code smell to me. When you mutate messageBufferRef.current, you are not necessarily triggering a component update, so if you are also rendering messageBufferRef.current, then it is entirely possible that your rendered text is one message behind your chat history. Anything that is used to render JSX should be from useState, as your component is forced to update whenever it's changed (correctly, via the state setter).

  3. This looks questionable to me:

        const lastMessage = newHistory[newHistory.length - 1];
        if (lastMessage.role === role) {
          lastMessage.message = message;
          return newHistory;
        }

While you created a shallow clone of your messageHistory state before mutating it, lastMessage is an object in your current state that you are mutating with lastMessage.message = message; -- again, depending on how you are using messageHistory (passing it down the app through context? Are there message components that are memoized on the message prop that you are mutating (but will not register as a change to them?), this can cause a variety of bugs.

  1. This won't necessarily fix your problem, but the hook returns a lastJsonMessage in addition to a lastMessage, so you don't need to parse it yourself.

My suggestions to debug this would be to start from the basics -- instead of going straight for a Provider/Consumer pattern, call useWebsocket directly by your components. A big point of the library is that it allows you to "share" a websocket connection (if you set share: true) across your app, without having to worry about context. I do understand that you might need a Provider in order to retain a source of truth formessageHistory, but for purposes of debugging, let the component that renders your messages retain the chat history (which you will obviously lose if/when that component unmounts). But the first thing that came to mind when I read your code and thought about your bug, is that you might be creating multiple chat instances, and/or web socket connections, possibly due to having multiple Providers (based on the fact that you are receiving multiple messages from the bot that seemingly address the same question, but worded differently).

I'd also add a console.log in your component body directly:

  const {
    sendMessage: sendWebSocketMessage,
    lastMessage,
    readyState,
  } = useWebSocket(WEBSOCKET_URL);
console.log(lastMessage);

This will allow you to see the order in which the messages are being passed in, without worrying about bugs that might be introduced when you add complexity like useEffect and useRef. I'd also suggest using share: true for the reasons I mentioned above.

Also, instead of your useEffect, try:

  const {
    sendMessage: sendWebSocketMessage,
    lastMessage,
    readyState,
  } = useWebSocket(WEBSOCKET_URL, {
    onMessage: (message) => {
       console.log({ message });
       handleWebSocketMessage(res);
    }
  });

That callback is invoked directly from the websocket message callback, and so it's closer to the source than lastMessage, which comes through react state. While I've never seen lastMessage be out of sync with what you've received through the websocket, it would at least eliminate one avenue of bugs.

@gandhis1
Copy link

I had the same thing happen to me as well. Yes, I understand hooks like these are intended to hold server-side state and serve as global caches of data, to permit calling them from anywhere.

In my case, I am combining data from a useWebsocket with a useQuery and memoizing a client-side post-process of the combined data, which then gets saved into global Zustand state for use throughout the application. Both of the former hooks hold cached state and so calling them from everywhere is fine, however memoization via useMemo is not globally-shared state and would be called redundantly if I were to attempt the idiomatic approach of re-using the hook calls everywhere. As a result, I use a useEffect to synchronize this data into my global Zustand store, which is what components in my application actually access.

I believe where I went wrong is that I ended up storing the raw copy of the data (lastJsonMessage) unaltered into the global store, which was later destroyed or corrupted in some way particular to the internals of the use-websocket library. Instead of doing that, I post-process first, which creates a shallow copy of the data, discarding the raw message data, and then store that via my useEffect instead.

Does that sound reasonable? Open to any other ideas of course. The data corruption issue is transient so it is not totally obvious whether my change actually worked or not.

@gandhis1
Copy link

Update here - my change did not actually work. I can confirm that (1) my back-end websocket server produces valid JSON, (2) WebSocketHook.lastMessage returns corrupted data, (3) when lastMessage is deserialized via lastJsonMessage, it produces a bad Object.

@robtaussig
Copy link
Owner

@gandhis1 There isn't a whole lot going on with lastJsonMessage:

lastMessage is from useState:

const [lastMessage, setLastMessage] = useState<WebSocketEventMap['message'] | null>(null);

Which is called directly in WebSocket.onmessage:

webSocketInstance.onmessage = (message: WebSocketEventMap['message']) => {
    optionsRef.current.onMessage && optionsRef.current.onMessage(message);

    if (typeof lastMessageTime?.current === 'number') {
      lastMessageTime.current = Date.now();
    }

    if (typeof optionsRef.current.filter === 'function' && optionsRef.current.filter(message) !== true) {
      return;
    }
    if (
      optionsRef.current.heartbeat &&
      typeof optionsRef.current.heartbeat !== "boolean" &&
      optionsRef.current.heartbeat?.returnMessage === message.data
    ) {
      return;
    }

    setLastMessage(message);
  };
};

And finally passed through JSON.parse, with guard rails:

const lastJsonMessage: T = useMemo(() => {
    if (lastMessage) {
      try {
        return JSON.parse(lastMessage.data);
      } catch (e) {
        return UNPARSABLE_JSON_OBJECT;
      }
    }
    return null;
  }, [lastMessage]);

@robtaussig
Copy link
Owner

@gandhis1 That said, if you are using a shared websocket, then all components that call the same endpoint are consuming the same lastMessage. So if any one component directly mutates lastMessage, then that could explain what you are experiencing. You'd be surprised with how often this is done -- to just use the code posted above:

const updateMessageHistory = (
    message: string,
    role: "assistant" | "user",
    isNewMessage: boolean
  ) => {
    setMessageHistory((prev) => {
      const newHistory = [...prev];
      if (!isNewMessage && newHistory.length > 0) {
        const lastMessage = newHistory[newHistory.length - 1];
        if (lastMessage.role === role) {
          lastMessage.message = message;
          return newHistory;
        }
      }
      newHistory.push({
        id: uid(),
        message: message,
        role: role,
        date: new Date(),
      });
      return newHistory;
    });
  };

In particular this:

const lastMessage = newHistory[newHistory.length - 1];
if (lastMessage.role === role) {
  lastMessage.message = message;
  return newHistory;
}

It seems like a pretty reasonable solution to update a chat message if the same user posts twice in a row, but this pattern is dangerous if multiple components are calling useWebSocket to the same endpoint (with share: true). An example of how this kind of bug can play out, especially when multiple developers are working on separate functionality:

// Component responsible for displaying chat history

const { lastMessage } = useWebSocket('wss://chat-endpoint.com', { share: true });
const [chatHistory, setChatHistory] = useState([]);

useEffect(() => {
  if (lastMessage) {
    lastMessage.data = JSON.parse(lastMessage.data);
    setChatHistory(lastMessage);
  }
}, [lastMessage]);
// Component responsible for displaying a toast notification whenever a message is received
const { lastMessage } = useWebSocket('wss://chat-endpoint.com', { share: true });

const jsonMessage = useMemo(() => {
  return JSON.parse(lastMessage.data);
}, [lastMessage]);

useEffect(() => {
  messageToast({ message: jsonMessage.message, dateReceived: jsonMessage.createdAt })
}, [jsonMessage]);

If the second component renders after the first, then it is going to throw an error when it calls JSON.parse on lastMessage.data, which has already been parsed directly by the first component (ironically, lastJsonMessage would be safer to use here, because all components are using their own parsed copy of lastJsonMessage). And if you are the developer working on the second component, I can totally see why the first thing they'd assume is that there is a bug with the "internals" of the library.

@gandhis1
Copy link

So a few additional points:

  • Because I am syncing this to a global store, I do this once in a dedicated component at the root of my application. I can confirm through inline console logging that this component only renders once per websocket update. That would seem to confirm that there is only a single access point.
  • When I first got this issue, I was not using share, although now I do have that set (I don't need it, so I can and will revert).
  • Nothing in my application mutates lastMessage. I only access lastJsonMessage, which is expected to be an array, and I do a shallow copy of that array in my post-processing / move and do not mutate lastJsonMessage directly either.
  • This issue is transient, in that I can get hundreds of messages across hours before the first time I get a corrupted message.
    • This is a key point. Once I get the first corrupted message in lastMessage, every message from that point forwards is corrupted. I continue to get messages and they continue to have changed data, it's just that data is corrupted every single time.
  • I know it's not the websocket server because another websocket client is unaffected.
    • Another key point. When this corruption happens, it happens to multiple people running the same version of the UI at a time. But someone else running a different version of the UI, hitting the same websocket, may not get affected (and vice versa).

Here is an example of a corrupted message:

Image

In a follow-up, I will paste some anonymized/simplified code here that precisely represents my usage pattern.

@gandhis1
Copy link

gandhis1 commented Jan 17, 2025

Here is what I am doing. This has been paraphrased / anonymized.

App.tsx (application root)

function GlobalFeeds (
    // This is a component so this only renders itself and nothing else
    // The websocket interacts with the rest of the application through the global store
    useGlobalItems()
    return null;
)

function MyRootComponent (
    return <>
        <MyComponent>
        <GlobalFeeds>
   </>
)

useGlobalItems hook

import axios from "axios";
import { format, isToday } from "date-fns";
import { useEffect, useMemo } from "react";
import { useQuery } from "react-query";
import useWebSocket from "react-use-websocket";
import useGlobalStore from "../stores/globalStore";
import Item from "../types/Item";

export default function useGlobalItems() {
  const asOfDate = useGlobalStore((state) => state.asOfDate);
  const liveMode = isToday(asOfDate);

  // Live items
  const webSocketHook = useWebSocket<Item[] | undefined>(
    import.meta.env.VITE_WEBSOCKET_ITEMS,
    {
      shouldReconnect: () => true,
      reconnectInterval: 1000,
      reconnectAttempts: 60,
      retryOnError: true,
    },
    liveMode,
  );

  // Historical items
  const queryResult = useQuery(
    ["items", asOfDate],
    async () =>
      await axios
        .get<Item[]>("/items", {
          params: { asof_date: format(asOfDate, "yyyy-MM-dd") },
        })
        .then((res) => res.data),
    {
      enabled: !liveMode,
      staleTime: 300_000, // 5 minutes
      refetchOnMount: "always",
      refetchOnWindowFocus: "always",
    },
  );

  // Global item configuration toggles
  const myToggle1 = useGlobalStore(
    (state) => state.myToggle1,
  );

  // Post-process items
  const liveItems = webSocketHook.lastJsonMessage;
  const historicalItems = queryResult.data;
  const rawItems = liveMode ? liveItems : historicalItems;
  console.log("webSocketHook", webSocketHook); // **************** THIS lastMessage CORRUPTED AFTER HOURS OF WORKING *****************
  console.log("liveMode", liveMode);
  console.log("liveItems", liveItems);
  console.log("historicalItems", historicalItems);
  const {
    items,
  } = useMemo(() => {
    if (!rawItems || !Array.isArray(rawItems)) {
      return {
        items: undefined,
      };
    }
    return postProcessItems(
      rawItems,
      myToggle1
    );
  }, [
    rawItems,
    myToggle1
  ]);

  // Synchronize to global store
  useEffect(() => {
    useGlobalStore.setState({items});
  }, [
    items,
  ]);
}

function postProcessItems(
  items: Item[],
  myToggle1: boolean
) {
  const processedItems = [];
  for (const item of items) {
    // this makes a shallow copy
    const processedItem: Item = {
      ...item,
      // Override some stuff based on myToggle1
    };
    processedItems.push(processedItem);
  }

  return {
    items: processedItems.length > 0 ? processedItems : undefined,
  };
}

In the above code webSocketHook.lastJsonMessage and webSocketHook.lastMessage both become corrupted after hours of working. Once it becomes corrupted for a given message, it is permanent and happens for every subsequent message. Upon refresh, everything is back to normal.

@gandhis1
Copy link

gandhis1 commented Jan 17, 2025

One other point. So given the reality that the websocket connection could get into a bad state where it permanently corrupts incoming messages, I decided to explore (1) detecting when the incoming data is corrupt (2) resetting the websocket connection when it happens. I did this by adding extra cruft (a UUID) that gets ignored to the websocket URL, and then changing that in a useEffect, so that the websocket URL changes, but it ends up connecting to the same websocket;.

Turns out that this approach works. Nothing in the websocket server changed, nothing in the client JS code change, I only just reconnected from scratch. Reconnecting fixes the corruption problems. So I'm still leaning towards it being something in the library itself that is going into a bad state.

@gandhis1
Copy link

Apologies for the repeated posts, but just want to give full color. One thing I did not fully troubleshoot is whether this is a browser-level issue. If all this library is doing is using the browser's websocket API, then the problem could be in the browser. I will check the "Network" / "WS" tab the next time I am able to replicate to see whether the raw response is also corrupted. And if it is, I have no idea what that means. Could be an internal firewall/proxy issue even.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

3 participants