Web Streams API in Action: Delivering 6000+ Log Lines Concurrently Across 20 Tabs

Web Streams API in Action: Delivering 6000+ Log Lines Concurrently Across 20 Tabs

In today's fast-paced digital world, efficient data handling is crucial. Whether you're processing massive amounts of data, streaming real-time updates, or delivering content to multiple clients concurrently, traditional methods can quickly become overwhelming. That's where the Web Streams API comes in. It offers a solution that allows you to efficiently stream data without overloading your server's memory. The term 'Streams' means exactly what it sounds like: A stream of flowing data. We then obtain this data in bits as it continuously streams towards us. Let's explore this powerful API through the lens of a real-world example: delivering over 6000 log lines concurrently across 20 tabs.

A Brief History Lesson

Imagine you have a large log file with over 6000 lines of valuable information. Traditionally, fetching this data and displaying it on a webpage would require loading the entire file into memory, which could slow down your server and impact performance. However, with the Web Streams API, you can stream the data on-demand, only fetching what you need when you need it. This not only saves memory and server (and client) resources, but also ensures a seamless user experience, as the logs can be displayed in real-time without delay.

The applications of the Web Streams API extend far beyond log files. Consider scenarios such as real-time analytics dashboards, chat applications, or even media streaming services. By using the Web Streams API, you can efficiently handle large volumes of data and provide responsive and dynamic experiences to your users.

An Example Using a Large Log File (6000+ lines)

To demonstrate the power of the Web Streams API, let's dive into some code snippets. We'll be using Next.js and the app router for this example, but the Web Streams API can be used in various frameworks and environments (it's a standardized web API. You can read more about it here).

TLDR; To see the code in action and explore a comprehensive example project, check out the GitHub repository: streaming-text-from-file-example.

The example repository has only 3 important files: app/api/route.ts, app/logs/page.jsx and app/page.jsx. Let's go over the code of these three in more detail.

app/api/route.ts

import { NextResponse } from "next/server";
import fs from "node:fs";
import path from "node:path";

/**
 *
 * @param duration - milliseconds to wait before resolving the promise.
 */
function sleep(duration = 2000) {
  return new Promise((resolve) => {
    setTimeout(resolve, duration);
  });
}

async function readFile(filePath: string, start: number, end: number) {
  const readStream = fs.createReadStream(filePath, {
    start,
    end,
  });

  return new Promise<Buffer>((resolve) => {
    readStream.on("data", (data) => {
      resolve(data as any);
    });
  });
}

function getFileReaderConfig() {
  // File being read. the file contains at least 6 thousand lines of logs.
  // They'll all be efficiently streamed to the frontend without overusing your server's memory.
  // Open multiple tabs using the same URL and see for yourself.
  const filePath = path.join(process.cwd(), `logs/2024-02-13.log`);
  const fileStats = fs.statSync(filePath);
  const byteLengthToRead = 16000; // represents 16KB. There's no reason for this, I just chose the number since it
  // showcases streaming better.

  const start = 0;
  let end = start + byteLengthToRead;

  if (end > fileStats.size) {
    end = fileStats.size;
  }

  return { filePath, start, end, byteLengthToRead, fileSize: fileStats.size };
}

async function* makeIterator(
  filePath: string,
  start: number,
  end: number,
  byteLengthToRead: number,
  fileSize: number,
): any {
  if (start >= end) {
    // we have read the whole file. save last read bytes and return.
    return;
  }
  // This is just added here to better showcase streaming on the browser. You'll be able to see data being populated.
  // Feel free to remove this line to use streaming at its best.
  await sleep(10);

  // The reason there's a start + 1 here is because we end up re-reading an extra byte everytime, duplicating
  // the last character read twice.
  yield await readFile(filePath, start + 1, end);

  // move [byteLengthToRead] bytes behind.
  const _start = end;
  let _end = end + byteLengthToRead;

  // if moving [byteLengthToRead] bytes behind becomes less than [ceiling], then move our
  // [_start] to [ceiling].
  if (_end > fileSize) {
    _end = fileSize;
  }
  yield* makeIterator(filePath, _start, _end, byteLengthToRead, fileSize);
}

function iteratorToStream(iterator: any) {
  return new ReadableStream({
    async pull(controller) {
      const { value, done } = await iterator.next();

      if (done) {
        controller.close();
      } else {
        controller.enqueue(value);
      }
    },
  });
}

export async function GET() {
  try {
    const { filePath, start, end, byteLengthToRead, fileSize } =
      getFileReaderConfig();

    const iterator = makeIterator(
      filePath,
      start,
      end,
      byteLengthToRead,
      fileSize,
    );
    const stream = iteratorToStream(iterator);

    return new Response(stream, {
      headers: {
        "Content-Type": "text/html; charset=utf-8",
      },
    });
  } catch (e) {
    return NextResponse.json(
      {
        message: `A server error occurred. Contact support. ${(e as any)?.message}`,
        code: 500,
      },
      {
        status: 500,
      },
    );
  }
}

This file represents a route handler (the new jargon used by App Router in place of api routes that were used in the Pages Router. Read more here: Next Route Handlers)

We have a 'GET' method that handles 'GET' requests to that route. We have a helper funtion getFileReaderConfig() that reads information about the file we're about to fetch (rather than reading the file itself). Here, we get important information like the file's size which we'll use to calculate the chunks we'll be serving back to the client.

We then have the makeIterator function, which is a generator function (as declared by function*). A generator function has the ability to persist its context across calls. This means the function can exit, and when called next, remembers the 'data' it had, and resume from it. You can read more here.

This is important since we need a way to recollect the last chunk we served to the client, and to which boundary that chunk got us. I.e. if we're serving a file of say 1000 bytes, and we have defined our chunk size as 100, and we have served the chunk 5 times, we should serve only the remaining chunk which is 500 bytes (100 * 5 chunks = 500 bytes. 1000- 500 bytes = 500 bytes).

The makeIterator function declares the fileSize (maximum bytes to read), the start (the start boundary, which initially is '0') and the end positions (which is initially the byteLengthToRead which is basically a 'chunk' of specific length, in this case, 16000 bytes). We then use the sliding window technique, where we continuously move the 'chunk' towards the end of the file until we have read the whole file.

We then read the file itself using fs.createReadSteam which has an extra argument called options which allow you to specify the start and end positions of the file you intend to read.

app/logs/page.tsx

"use client";
import { useCallback, useEffect, useMemo, useRef, useState } from "react";

const anchorForScrollId = "anchor-to-tell-document-to-scroll-to";
export default function Home() {
  const [streamingLogs, setStreamingLogs] = useState<string>("");
  const [errorMessage, setErrorMessage] = useState<string>("");
  const timeoutHandle = useRef<NodeJS.Timeout>();
  const isFetchOngoing = useRef(false);

  const getLogs = useCallback(async () => {
    if (isFetchOngoing.current) {
      return;
    }
    isFetchOngoing.current = true;
    setStreamingLogs("");
    try {
      const response = await fetch(`/api/logging`);
      if (response.ok) {
        const textDecoder = new TextDecoder();
        const reader = response?.body?.getReader();

        if (reader) {
          while (true) {
            const { value, done } = await reader.read();
            if (value) {
              const stringData = textDecoder.decode(value);
              setStreamingLogs((l) => `${l}${stringData}`);
            }
            // if done, call getLogs again at some point in future for more data.
            if (done) {
              // reset the logs to an empty string so that for the next fetch, we don't duplicate strings.
              clearTimeout(timeoutHandle.current);
              timeoutHandle.current = setTimeout(getLogs, 100000);
              isFetchOngoing.current = false;
              return;
            }
          }
        } else {
          const json = await response.json();
          setErrorMessage(
            `An error occurred:${json?.message}. You'll need to refresh this page.`,
          );
        }
      }
    } catch (e) {
      setErrorMessage(
        `An error occurred: ${(e as any)?.message} ${(e as any)?.response?.message}. You'll need to refresh this page.`,
      );
    }
  }, []);

  useEffect(() => {
    getLogs().then();

    const _handle = timeoutHandle.current;
    return () => {
      clearTimeout(_handle);
    };
  }, [getLogs]);

  useEffect(() => {
    document.getElementById(anchorForScrollId)?.scrollIntoView({
      behavior: "smooth",
    });
  }, [streamingLogs]);

  const logs = useMemo(() => {
    return streamingLogs?.split("\n");
  }, [streamingLogs]);

  return (
    <div className="flex flex-col w-full h-[100vh] p-10 overflow-auto">
      <h5 className="w-full pb-4 border-b border-secondary-color-100 border-solid">
        Logs should automatically start appearing below.
      </h5>
      <h5 className="text-red-700">{errorMessage}</h5>
      {logs?.map((l, idx) => (
        <h6 key={idx} className="w-full">
          {l || <br />}
        </h6>
      ))}
      <div id={anchorForScrollId} />
    </div>
  );
}

This file is a React component that does the heavy lifting on the client side. In this file, we make a network request to our logging api that we just defined in the previous code snippet. We then obtain a 'ReadableStreamDefaultReader' object by calling response.body.getReader() which will help us read single chunks as they're made available in the stream. We then convert these byte streams into strings using the TextEncode API and render this string on the UI.

Notable mentions are the isFetchOngoing ref and the timeoutHandler. The isFetchOngoing ref stores a boolean value to indicate whether there's already a stream currently being consumed. If it's the case, it prevents duplicate requests being sent that will then read independent streams and mess up the UI rendering. The timeoutHandler stores a reference to the setTimeout function we call (since we end up polling the server every 100 seconds) that allows us to clear the timeout event in cases where out component unmounts and setTimeout was already fired.

app/page.tsx

"use client";

import { useEffect } from "react";

export default function Home() {
  // open 20 tabs instantaneously to see whether the server will handle all streams.
  // You will need to 'allow all popups' in your browser to see this in effect.
  useEffect(() => {
    let tabs = 20;
    while (tabs) {
      window.open("/logs", "_blank");
      tabs--;
    }
  }, []);

  return (
    <h2 className="p-10 m-10 w-full flex flex-row- items-center justify-center">
      Nothing&apos;s happening? Make sure to enable popups in your browser!
    </h2>
  );
}

This is the page that is now rendered on the '/' (or index) route. We open 20 tabs concurrently each having its own established stream (the previous code snippet that handles streaming of data from the backend) to see how the server behaves under this pressure.

The server does just fine, streaming all 6000+ lines back to all 20 clients (as represented by the tabs).

Conclusion

The Web Streams API opens up new possibilities for handling large volumes of data efficiently. By streaming data on-demand instead of loading it all into memory, you can provide responsive experiences to your users without sacrificing server performance. Whether you're dealing with log files, real-time analytics, or media streaming, incorporating the Web Streams API can greatly enhance your application's capabilities.

In this blog post, we've explored the basics of the Web Streams API, provided practical examples and code snippets, and highlighted its significance in today's data-driven world. Now it's time for you to dive in, experiment with streaming data, and unlock new possibilities for your web applications.

Remember, efficient data handling is the backbone of modern web applications. Embrace the power of streaming data, and unlock a whole new level of performance and user experience!