Implementing A Custom Chatbot With OpenAI And Python

Implementing A Custom Chatbot With OpenAI And Python

5 min read

Description of Implementing A Custom Chatbot With OpenAI And Python

Building Blocks

Python Generators

Generators are functions that allow you to declare a function that behaves like an iterator, i.e., it can be used in a for loop. They allow you to iterate over data without storing the entire data set in memory, which can be very useful when dealing with large data or when you want to create infinite sequences.

When you call a normal Python function, it runs to completion and returns a result. However, calling a generator function creates a generator object, but none of the code in the function runs immediately. Instead, when you iterate over the generator (for example, using a `for` loop), the function runs until it hits a `yield` statement, which returns the yielded value and pauses the function's state. The function can then be resumed right after the `yield` statement from outside the generator.

Here's an example of a simple generator function and how you use it:

def simple_generator():
    yield 1
    yield 2
    yield 3

# Generator objects can be iterated over
for value in simple_generator():
    print(value)

When you run this code, it will print:

``` 1 2 3 ```

AsyncIO

Asyncio is a Python library that provides a framework for writing single-threaded concurrent code using coroutines, multiplexing I/O access over sockets and other resources. It allows you to write code that performs high-level asynchronous I/O operations, without needing to worry about thread management.

To use asyncio, you define coroutines using the `async` keyword. A coroutine is a special function that can give up control to the caller without losing its state when awaiting a result from an asynchronous operation. This is done through the `await` keyword.

Here are some examples using asyncio:

import asyncio

# Define a coroutine
async def say_after(delay, what):
    await asyncio.sleep(delay)
    print(what)

# Main coroutine that waits for other coroutines
async def main():
    print(f"started at {time.strftime('%X')}")

    # Wait for two coroutines to complete
    await say_after(1, 'hello')
    await say_after(2, 'world')

    print(f"finished at {time.strftime('%X')}")

# Running the main coroutine
asyncio.run(main())

This code will print "hello" after a delay of 1 second, then "world" after another 2 seconds delay. Note that `asyncio.run()` is the function that runs the main coroutine, and control is passed between the event loop and the coroutines with `await`.

In summary, generators allow you to yield values one at a time as they are required, conserving memory, while asyncio allows you to write code that handles asynchronous operations in an efficient and readable way, making it possible to perform a lot of network or I/O-bound task concurrently without using multi-threading.

ASGI vs WSGI

Description

ASGI (Asynchronous Server Gateway Interface) and WSGI (Web Server Gateway Interface) are both specifications for web servers to communicate with Python web applications, but they are designed with different programming paradigms in mind.

### WSGI

WSGI is a synchronous standard defined in PEP 333 that allows a web server to communicate with a Python web application. It's a simple and traditional model where the application is called with an environment (environ) dictionary containing the request information and a start_response function to initiate the response. Because WSGI is synchronous, it can handle only one request at a time per process, making it suitable for applications with limited parallel processing needs.

### ASGI

ASGI, on the other hand, is an asynchronous standard designed to support asynchronous Python features such as the `asyncio` library. ASGI applications can handle multiple requests concurrently within a single process, making them more scalable and performant for applications that manage many simultaneous connections or have a lot of wait time for I/O operations.

### Comparison

Here’s a brief comparison:

  • Concurrency: ASGI can handle many concurrent connections, making it suitable for WebSockets, long-poll HTTP, and other long-lived connections.

  • Compatibility: WSGI is an older standard and is supported by almost all Python web frameworks, whereas ASGI support is growing but not as ubiquitous.

  • Complexity: ASGI applications can be more complex due to the asynchronous nature of the code.

  • Performance: ASGI applications can be more efficient with resources, particularly with I/O-bound operations and when leveraging HTTP/2 or Server Sent Events (SSE).

### ASGI with HTTP/2 and SSE

HTTP/2 is a major revision of the HTTP network protocol that allows multiple concurrent requests over a single TCP connection, known as multiplexing. This is a good match for ASGI's asynchronous capabilities, allowing ASGI to manage these multiple requests concurrently without the overhead of creating multiple threads or processes.

Server Sent Events (SSE) are a standard allowing a server to push real-time updates to a client over an HTTP connection. SSE requires the server to keep a connection open and send events when new updates are available.

ASGI is well-suited for SSE because:

  • Asynchronous Handling: ASGI can efficiently manage the open connections typically used in SSE without keeping threads blocked, as the server only sends data to the client when an event is available.

  • Scalability: With its ability to handle numerous connections in a single process, ASGI servers can support many clients simultaneously, sending updates to each client as needed.

  • Compatibility with HTTP/2: Combining the multiplexing capabilities of HTTP/2 with ASGI makes it easier to handle multiple SSE streams over a single connection.

Thus, ASGI, when used with HTTP/2, becomes an excellent candidate for building applications with SSE because it offers a performant and scalable way to handle real-time, server-initiated communications with clients.

Illustration


sequenceDiagram
    participant Client
    participant Server
    Client->>Server: GET /stream
    Note over Client: Header: Accept: text/event-stream
    Note over Server: Establishes stream connection
    loop Streaming data
        Server-->>Client: data: {"event": "message", "data": "JSON payload"}
        Note over Server: Header: Content-Type: text/event-stream
    end
    Note over Client: Processes each received event

Server Sent Events (SSE)

Description

Server Sent Events (SSE) is a technology that allows a server to send real-time updates to a web page over a single persistent HTTP connection. This is a technique used to build applications that need to update the client in real-time, such as news feeds, social media updates, or live scores.

Here's a brief explanation suitable for a web newbie:

Normally, when you visit a webpage, your browser makes a request to a server, the server sends the requested page back, and that's it. The connection is closed until you make another request (like clicking a link or refreshing the page). However, sometimes you want to receive continuous updates from the server without needing to ask for them (polling) each time. That's what SSE is for.

With SSE, the server keeps the connection open after sending the initial response and can then send new data whenever it becomes available. This is extremely useful for delivering updates in real-time.

Illustration

Below is a text-based representation of a diagram showing the SSE flow from server to client, including payload shapes with topic and data:


sequenceDiagram
    participant Client
    participant Server

    Note over Client,Server: Connection Initialization
    Client->>Server: GET /events
    Note over Server: Set "Content-Type: text/event-stream"
    Server-->>Client: HTTP 200 OK

    Note over Client,Server: Streaming Events
    loop Every Time an Event Is Sent
        Note over Server: Server prepares an event with a specific topic and data
        Server->>Client: event: user-update\n\n
        Server->>Client: data: {"userId": 1, "status": "active"}\n\n
        Client->>Server: Event received, handling user-update

        Server->>Client: event: message\n\n
        Server->>Client: data: {"chatId": 42, "text": "Hello there!"}\n\n
        Client->>Server: Event received, handling message
    end

    Note over Client,Server: Connection Closed (either by client or server)
    Server--xClient: Connection closed
    Client--xServer: Connection closed

You can plug this text into a Mermaid live editor or include it in a GitHub MarkDown file to render a visual diagram. If you're coding this manually, you would create rectangles to represent the Client and Server as actors, a line to represent the streaming connection, and then draw payloads as labeled shapes (such as bubbles or rectangles) that contain the event type and JSON data structures you expect to be passed between the server and the client. The events should be directional arrows from the server to the client, showcasing the unidirectional nature of SSE.

Code

Here's an example with org-mode formatted code:

Client-side code (HTML + JavaScript)

The following example shows how you might implement a simple web page that listens to SSE using JavaScript.

DOCTYPE html>
<html>
<head>
  <title>Server Sent Events Exampletitle>
head>
<body>
  <h1>Real-time Updatesh1>
  <div id="updates">div>

  <script>
    // Create a new EventSource instance that connects to the SSE endpoint
    const eventSource = new EventSource('/events');

    eventSource.onmessage = function(event) {
      // This function is called when a message is received
      const messageData = event.data;
      
      // Append the new data to the 'updates' div
      const updatesElement = document.getElementById('updates');
      updatesElement.innerHTML += messageData + '
'; }; eventSource.onerror = function(error) { // Handle any errors that occur console.log('EventSource failed: ', error); }; // When you're done listening to events // eventSource.close(); script> body> html>

Server-side code (using Node.js)

On the server side, you would have an endpoint that streams updates to the client using SSE. The following example uses Node.js with the Express framework.

const express = require('express');
const app = express();

app.get('/events', (req, res) => {
  // Headers to set up SSE
  res.writeHead(200, {
    'Content-Type': 'text/event-stream',
    'Cache-Control': 'no-cache',
    'Connection': 'keep-alive'
  });

  const sendEvent = (data) => {
    res.write(`data: ${JSON.stringify(data)}\n\n`); // Send the data to the client
  };

  // Send an update every second
  const intervalId = setInterval(() => {
    const message = { text: 'Hello World', timestamp: new Date() };
    sendEvent(message);
  }, 1000);

  // Close the connection when the client disconnects
  req.on('close', () => {
    clearInterval(intervalId);
  });
});

const PORT = 3000;
app.listen(PORT, () => {
  console.log(`Server is running on port ${PORT}`);
});

To run this example, you would need to set up Node.js on your system, install Express (`npm install express`), and save the server-side code in a file (e.g., `server.js`). Then, you could simply run your server with `node server.js`, and point your web browser to the client-side HTML page to start receiving events.

Supporting Libraries


poetry init # initialize a new poetry project in your current directory

OpenAI

OpenAI is an AI research and deployment company. Their mission is to ensure that artificial general intelligence benefits all of humanity.

GPT-4 is OpenAI’s most advanced system, producing safer and more useful responses. GPT-4 can solve difficult problems with greater accuracy, thanks to its broader general knowledge and problem solving abilities.

DALL·E 3 understands significantly more nuance and detail than our previous systems, allowing you to easily translate your ideas into exceptionally accurate images.


poetry add openai # add openai

FastAPI

FastAPI is a modern, fast (high-performance), web framework for building APIs with Python 3.8+ based on standard Python type hints.


poetry add "fastapi[all]" "uvicorn[standard]" # add fastapi and uvicorn

Jinja2

Jinja is a fast, expressive, extensible templating engine. Special placeholders in the template allow writing code similar to Python syntax. Then the template is passed data to render the final document.

Installation


poetry add jinja2 # add jinja2

Code

templates/doggo.jinja2



<html>
<head>
    <link rel="stylesheet" href="https://cdn.jsdelivr.net/npm/bulma@0.9.4/css/bulma.min.css">
    <script src="https://unpkg.com/htmx.org@1.9.10">script>
    <script src="https://unpkg.com/htmx.org@1.9.10/dist/ext/sse.js">script>
    <title>Formulario | Page Doggotitle>
head>
<body>
    <h1>Dog Breeds as Server Sent Eventsh1>
    <hr>
    <div id="doggo-sse-listener" hx-ext="sse" sse-connect="/dogstream" sse-swap="Terminate,DogBreedNoMass,DogBreed">
    div>
    <b>
        <div id="DogBreedNoMass">div>
        <br>
        <div id="DogBreed">div>
    b>
{#    <div hx-ext="sse" sse-connect="/dogstream">#}
{#        <b>#}
{#            <div sse-swap="DogBreedNoMass">div>#}
{#            <br>#}
{#            <div sse-swap="DogBreed">div>#}
{#        b>#}
{#    div>#}
body>
html>

templates/openai.jinja2


<html>
<head>
    <link rel="stylesheet" href="https://cdn.jsdelivr.net/npm/bulma@0.9.4/css/bulma.min.css">
    <script src="https://unpkg.com/htmx.org@1.9.10">script>
    <script src="https://unpkg.com/htmx.org@1.9.10/dist/ext/sse.js">script>
    <title>Page | Raw OpenAI Responsetitle>
head>
<body>
<h1>OpenAI Response as Server Sent Eventsh1>
<hr>
<div hx-ext="sse" sse-connect="/openaistream">
    <article class="message is-info">
        <div class="message-header">
            <p sse-swap="ResponseNoMass">Infop>
            <button class="delete" aria-label="delete">button>
        div>
{#        <div class="message-body" sse-swap="Response" hx-swap="beforeend">#}
{#        div>#}
    article>

    <div class="message-body" sse-swap="Response" hx-swap="innerHTML">
    div>
div>
body>
html>

HTMX

htmx gives you access to AJAX, CSS Transitions, WebSockets and Server Sent Events directly in HTML, using attributes, so you can build modern user interfaces with the simplicity and power of hypertext.

Code

templates/layout.jinja2


<html>
<head>
    <meta charset="UTF-8">
    <meta name="viewport" content="width=device-width, initial-scale=1.0">

    <script src="https://cdn.tailwindcss.com">script>

    <script src="https://unpkg.com/htmx.org@1.9.10">script>
    <script src="https://unpkg.com/htmx.org@1.9.10/dist/ext/sse.js">script>

    {% block head %}{% endblock %}
head>
<body hx-boost="true">
{% block body %}{% endblock %}
{% block scripts %}{% endblock %}
body>
html>

templates/partials/sse.jinja2


{% macro sse_stream(sse_config) %}
    <div id="{{ sse_config.listener }}" hx-ext="sse" sse-connect="{{ sse_config.path }}" sse-swap="{{ sse_config.topics | join(',') }}">
    div>
{% endmacro %}

templates/partials/streaming_chunk.jinja2


event: {{ event }}
data: <div {% for name, value in attrs.items() %} {{ name }}="{{ value }}" {% endfor %}>{{ chunk }}div>

templates/partials/ai_message.jinja2


{% macro ai_msg(message) %}
    
    <div class="flex gap-3 my-4 text-gray-600 text-sm flex-1">
        <span class="relative flex shrink-0 overflow-hidden rounded-full w-8 h-8">
            <div class="rounded-full bg-gray-100 border p-1">
                <svg stroke="none" fill="black" stroke-width="1.5"
                     viewBox="0 0 24 24" aria-hidden="true" height="20" width="20"
                     xmlns="http://www.w3.org/2000/svg">
                    <path stroke-linecap="round" stroke-linejoin="round"
                          d="M9.813 15.904L9 18.75l-.813-2.846a4.5 4.5 0 00-3.09-3.09L2.25 12l2.846-.813a4.5 4.5 0 003.09-3.09L9 5.25l.813 2.846a4.5 4.5 0 003.09 3.09L15.75 12l-2.846.813a4.5 4.5 0 00-3.09 3.09zM18.259 8.715L18 9.75l-.259-1.035a3.375 3.375 0 00-2.455-2.456L14.25 6l1.036-.259a3.375 3.375 0 002.455-2.456L18 2.25l.259 1.035a3.375 3.375 0 002.456 2.456L21.75 6l-1.035.259a3.375 3.375 0 00-2.456 2.456zM16.894 20.567L16.5 21.75l-.394-1.183a2.25 2.25 0 00-1.423-1.423L13.5 18.75l1.183-.394a2.25 2.25 0 001.423-1.423l.394-1.183.394 1.183a2.25 2.25 0 001.423 1.423l1.183.394-1.183.394a2.25 2.25 0 00-1.423 1.423z">
                    path>
                svg>
            div>
        span>
        <p class="leading-relaxed">
            <span class="block font-bold text-gray-700">AI span>
            {{ message }}
            <span id="Response">span>
        p>
    div>
{% endmacro %}

templates/partials/user_message.jinja2


{% macro user_msg(message) %}

<div class="flex gap-3 my-4 text-gray-600 text-sm flex-1"><span
    class="relative flex shrink-0 overflow-hidden rounded-full w-8 h-8">
    <div class="rounded-full bg-gray-100 border p-1"><svg stroke="none" fill="black" stroke-width="0"
        viewBox="0 0 16 16" height="20" width="20" xmlns="http://www.w3.org/2000/svg">
        <path
          d="M8 8a3 3 0 1 0 0-6 3 3 0 0 0 0 6Zm2-3a2 2 0 1 1-4 0 2 2 0 0 1 4 0Zm4 8c0 1-1 1-1 1H3s-1 0-1-1 1-4 6-4 6 3 6 4Zm-1-.004c-.001-.246-.154-.986-.832-1.664C11.516 10.68 10.289 10 8 10c-2.29 0-3.516.68-4.168 1.332-.678.678-.83 1.418-.832 1.664h10Z">
        path>
      svg>div>
  span>
  <p class="leading-relaxed">
      <span class="block font-bold text-gray-700">You span>
      {{ message }}
  p>
div>
{% endmacro %}

templates/index.jinja2


{% extends "layout.jinja2" %}

{% from "partials/sse.jinja2" import sse_stream %}
{% from "partials/ai_message.jinja2" import ai_msg %}
{% from "partials/user_message.jinja2" import user_msg %}

{% block body %}
    
    <button class="fixed bottom-4 right-4 inline-flex items-center justify-center text-sm font-medium disabled:pointer-events-none disabled:opacity-50 border rounded-full w-16 h-16 bg-black hover:bg-gray-700 m-0 cursor-pointer border-gray-200 bg-none p-0 normal-case leading-5 hover:text-gray-900"
            type="button" aria-haspopup="dialog" aria-expanded="false" data-state="closed">
    <svg xmlns=" http://www.w3.org/2000/svg" width="30" height="40" viewBox="0 0 24 24" fill="none"
         stroke="currentColor" stroke-width="2" stroke-linecap="round" stroke-linejoin="round"
         class="text-white block border-gray-200 align-middle">
        <path d="m3 21 1.9-5.7a8.5 8.5 0 1 1 3.8 3.8z" class="border-gray-200">
        path>
    svg>
  button>

  <div class="md:container md:mx-auto" style="box-shadow: 0 0 #0000, 0 0 #0000, 0 1px 2px 0 rgb(0 0 0 / 0.05);"
    class="fixed bottom-[calc(4rem+1.5rem)] right-40 mr-4 bg-white p-6 rounded-lg border border-[#e5e7eb] w-[3/4] h-[634px]">

    
    <div class="flex flex-col space-y-1.5 pb-6">
      <h2 class="font-semibold text-lg tracking-tight">Custom Chatboth2>
      <p class="text-sm text-[#6b7280] leading-3">Powered by Your OpenAI Keyp>
    div>

    
    <div class="pr-4 h-[474px]" style="min-width: 100%; display: table;">
      {% if sse_config %}
          {{ sse_stream(sse_config) }}
      {% endif %}
      {% for message in messages %}
          {% if message.sender == 'ai' %}
              {{ ai_msg(message.content) }}
          {% endif %}
          {% if message.sender == 'user' %}
              {{ user_msg(message.content) }}
          {% endif %}
      {% endfor %}
    div>

    
    <div class="flex items-center pt-0">
      <form
        action="{{ url_for('openai', req_id=req_id) }}" method="POST"
        class="flex items-center justify-center w-full space-x-2"
      >
        <input
          class="flex h-10 w-full rounded-md border border-[#e5e7eb] px-3 py-2 text-sm placeholder-[#6b7280] focus:outline-none focus:ring-2 focus:ring-[#9ca3af] disabled:cursor-not-allowed disabled:opacity-50 text-[#030712] focus-visible:ring-offset-2"
          type="text" name="user_prompt"
          placeholder="Message ChatGPT..." value="">
        <input
          type="submit"
          class="inline-flex items-center justify-center rounded-md text-sm font-medium text-[#f9fafb] disabled:pointer-events-none disabled:opacity-50 bg-black hover:bg-[#111827E6] h-10 px-4 py-2"
          value="Send message">
      form>
    div>

  div>

{% endblock %}

Markdown

Markdown is a lightweight markup language that you can use to add formatting elements to plaintext text documents. Created by John Gruber in 2004, Markdown is now one of the world’s most popular markup languages.


sequenceDiagram
    participant M as Markdown File
    participant P as Markdown Processor
    participant H as HTML File
    participant B as Web Browser
    M->>P: Process Markdown
    P->>H: Generate HTML
    H->>B: Render in Browser

This sequence diagram illustrates the simple flow from a Markdown file through processing to rendering as follows:

  1. The "Markdown File" acts as the input.

  2. The "Markdown Processor" converts the Markdown to HTML.

  3. The resulting "HTML File" is then ready.

  4. Finally, the "Web Browser" renders the HTML file to display the output to the user.

TailwindCSS

TailwindCSS utility-first CSS framework packed with classes like flex, pt-4, text-center and rotate-90 that can be composed to build any design, directly in your markup.

SQLAlchemy

Code


[tool.poetry]
name = "custom-chatbot"
version = "0.1.0"
description = ""
authors = ["ChiefKemist"]
readme = "README.md"

[tool.poetry.dependencies]
python = "^3.12"
Jinja2 = "^3.1.2"
fastapi = "^0.109.0"
python-multipart = "^0.0.6"
uvicorn = {extras = ["standard"], version = "^0.25.0"}
httpx = "^0.26.0"
openai = "^1.7.2"
sqlalchemy = "^2.0.25"
markdown = "^3.5.2"
pygments = "^2.17.2"
semantic-router = "^0.0.18"
rich = "^13.7.0"
## Extra
# langchain = "^0.1.0"
# fastui = "^0.4.0"

[tool.poetry.group.dev.dependencies]
pytest = "^7.4.4"
black = "^23.12.1"

[build-system]
requires = ["poetry-core"]
build-backend = "poetry.core.masonry.api"

CLI ChatApp

Chat context

Chat history persistence

Code


import logging
import sys
import hashlib

import openai

from sqlalchemy import create_engine, Column, Integer, String, DateTime
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.orm import sessionmaker
from datetime import datetime
from getpass import getpass

from rich.console import Console

logging.basicConfig(
    stream=sys.stdout, level=logging.DEBUG,
    format='%(asctime)s %(levelname)s [%(module)s] %(message)s',
)
log = logging.getLogger(__name__)

console = Console()

# SQLAlchemy setup
Base = declarative_base()


class User(Base):
    __tablename__ = 'users'
    id = Column(Integer, primary_key=True)
    username = Column(String, unique=True)
    password_hash = Column(String)


class Message(Base):
    __tablename__ = 'messages'
    id = Column(Integer, primary_key=True)
    chat_room = Column(String)
    sender = Column(String)
    message = Column(String)
    timestamp = Column(DateTime, default=datetime.now)


# SQLite database connection
engine = create_engine('sqlite:///chat.db')
Base.metadata.create_all(engine)
Session = sessionmaker(bind=engine)
session = Session()


def hash_password(password):
    return hashlib.sha256(password.encode()).hexdigest()


def register_user():
    username = input("Enter new username: ")
    if session.query(User).filter_by(username=username).first():
        console.log(
            "Username already exists. Please try a different username.",
            style="bold red"
        )
        return None

    password = getpass("Enter new password: ")
    hashed_password = hash_password(password)
    new_user = User(username=username, password_hash=hashed_password)
    session.add(new_user)
    session.commit()
    return username


def login_user():
    username = input("Enter username: ")
    password = getpass("Enter password: ")
    hashed_password = hash_password(password)

    user = session.query(User).filter_by(username=username, password_hash=hashed_password).first()
    if user:
        return username
    else:
        console.log("Invalid username or password.", style="bold red")
        return None


def save_message(chat_room, sender, message):
    new_message = Message(chat_room=chat_room, sender=sender, message=message)
    session.add(new_message)
    session.commit()


def get_chat_history(chat_room):
    messages = session.query(Message).filter_by(chat_room=chat_room).order_by(Message.timestamp).all()
    return [f"{message.sender}: {message.message}" for message in messages]


def get_gpt4_response(prompt, chat_history):
    # openai.api_key = 'your-api-key'  # Replace with your actual OpenAI API key

    combined_prompt = "\n".join(chat_history[-50:]) + f"\n{prompt}"  # Limit history to last 50 messages
    response = openai.OpenAI().chat.completions.create(
        model="gpt-4",
        messages=[
            {"role": "system", "content": "You are a helpful assistant."},
            {"role": "user", "content": combined_prompt}
        ]
    )
    return response.choices[0].message.content


def run_chat(username):
    chat_room = username
    console.log(
        f"Welcome to your personal GPT-4 Chat CLI, {username}. Type 'quit' to exit.",
        style="bold blue"
    )

    while True:
        user_input = input("You: ")
        if user_input.lower() == 'quit':
            break

        chat_context = get_chat_history(chat_room)
        gpt_response = get_gpt4_response(user_input, chat_context)

        save_message(chat_room, "You", user_input)
        save_message(chat_room, "GPT-4", gpt_response)

        console.log(f"GPT-4: {gpt_response}", style="bold green")

    console.log("Chat ended.", style="bold blue")


def main():
    log.info("Welcome to the Chat Application")
    choice = input("Do you want to [L]ogin or [R]egister? (L/R): ").lower()

    username = None
    while not username:
        if choice == 'r':
            username = register_user()
        elif choice == 'l':
            username = login_user()
        else:
            choice = input("Please enter 'L' to login or 'R' to register: ").lower()

    if username:
        run_chat(username)


if __name__ == "__main__":
    main()

Streaming Doggos

Description

Stream dog breeds to the browser leveraging both Python's asyncio and generators, as well as Htmx. The dog breeds are provided by dog.ceo.

Code

#begin_src bash

curl https://dog.ceo/api/breeds/list/all

#+end_src

async_doggo.py


#!/usr/bin/env python3

import asyncio
import logging
import sys
import typing

import httpx
from fastapi import FastAPI, Request
from fastapi.responses import HTMLResponse, StreamingResponse
from fastapi.staticfiles import StaticFiles
from fastapi.templating import Jinja2Templates
from jinja2 import Environment, FileSystemLoader, select_autoescape

# from pygments.formatters import HtmlFormatter

logging.basicConfig(
    stream=sys.stdout, level=logging.DEBUG,
    format='%(asctime)s %(levelname)s [%(module)s] %(message)s',
)
log = logging.getLogger(__name__)

# log.addHandler(logging.StreamHandler(sys.stdout)

app = FastAPI()

app.mount("/static", StaticFiles(directory="static"), name="static")


def app_context(request: Request) -> typing.Dict[str, typing.Any]:
    return {'app': request.app}


templates = Jinja2Templates(
    directory="templates",
    context_processors=[app_context],
)


@app.get("/", response_class=HTMLResponse)
async def index(request: Request):
    return templates.TemplateResponse(
        request=request, name="doggo.jinja2",
    )


def render_sse_html_chunk(event, chunk, attrs=None):
    if attrs is None:
        attrs = {}
    tmpl = Environment(
        loader=FileSystemLoader('templates/partials'),
        autoescape=select_autoescape(['html'])
    ).select_template(['streaming_chunk.jinja2'])
    html_chunk = tmpl.render(**dict(event=event, chunk=chunk, attrs=attrs))
    return html_chunk


async def gen_dog_breeds():
    async with httpx.AsyncClient() as client:
        breeds = (await client.get('https://dog.ceo/api/breeds/list/all')).json()
        for breed in breeds['message'].keys():
            log.info(f"Yielding {breed}")
            yield breed


@app.get("/dogstream", response_class=StreamingResponse)
async def dogstream(request: Request):
    async def dogbreeds_iter():
        async for breed in gen_dog_breeds():
            await asyncio.sleep(0.2)
            breed_status_chunk = render_sse_html_chunk(
                'DogBreedNoMass',
                'More doggo senior :-)',
                {
                    'id': 'DogBreedNoMass',
                    # 'hx-swap-oob': 'beforeend',
                    'hx-swap-oob': 'true',
                },
            )
            yield f'{breed_status_chunk}\n\n'.encode('utf-8')
            await asyncio.sleep(0.2)
            chunk = render_sse_html_chunk(
                'DogBreed',
                breed,
                {
                    'id': 'DogBreed',
                    # 'hx-swap-oob': 'beforeend',
                    'hx-swap-oob': 'true',
                },
            )
            yield f'{chunk}\n\n'.encode('utf-8')
        breed_status_chunk = render_sse_html_chunk(
            'DogBreedNoMass',
            'No more doggo senior :-(',
            {
                'id': 'DogBreedNoMass',
                'hx-swap-oob': 'true',
            },
        )
        yield f'{breed_status_chunk}\n\n'.encode('utf-8')
        # chunk = render_sse_html_chunk(
        #     'Terminate',
        #     '',
        #     {
        #         'id': 'doggo-sse-listener',
        #         'hx-swap-oob': 'true',
        #     },
        # )
        # yield f'{chunk}\n\n'.encode('utf-8')

    return StreamingResponse(
        dogbreeds_iter(),
        media_type='text/event-stream',
    )


if __name__ == '__main__':
    import uvicorn

    uvicorn.run('async_doggo:app', host='0.0.0.0', port=6543, reload=True)
    # uvicorn.run('async_doggo:app', host='0.0.0.0', port=6543, workers=4)

Streaming OpenAI

Raw / Plain

Better looking

Rich Text

async_chat.py


#!/usr/bin/env python3

import asyncio
import functools
import concurrent.futures
import logging
import re
import sys
import queue
import typing
import uuid
from io import StringIO

from time import sleep

import httpx
import markdown
# from pygments.formatters import HtmlFormatter
from pygments import highlight
from pygments.lexers import get_lexer_by_name
from pygments.formatters.html import HtmlFormatter

from fastapi import FastAPI, Request, Form
from fastapi.responses import HTMLResponse, StreamingResponse
from fastapi.staticfiles import StaticFiles
from fastapi.templating import Jinja2Templates
from jinja2 import Environment, FileSystemLoader, select_autoescape

logging.basicConfig(
    stream=sys.stdout, level=logging.DEBUG,
    format='%(asctime)s %(levelname)s [%(module)s] %(message)s',
)
log = logging.getLogger(__name__)

# log.addHandler(logging.StreamHandler(sys.stdout)

app = FastAPI()

app.mount("/static", StaticFiles(directory="static"), name="static")

def app_context(request: Request) -> typing.Dict[str, typing.Any]:
    return {'app': request.app}

templates = Jinja2Templates(
    directory="templates",
    context_processors=[app_context],
)


@app.get("/", response_class=HTMLResponse)
async def index(request: Request):
    req_id = str(uuid.uuid4())
    messages = [
        {'sender': 'ai', 'content': 'Hi, how can I help you today?'},
        {'sender': 'user', 'content': 'faslskadalksjioqjeqlkj'},
        {'sender': 'ai',
         'content': 'Sorry, I couldn\'t find any information in the documentation about that. Expect answer to be less accurateI could not find the answer to this in the verified sources.'},
    ]
    # TODO: Add div to last AI block to trigger rendering of SSE response
    return templates.TemplateResponse(
        request=request, name="index.jinja2",
        context=dict(messages=messages, req_id=req_id)
    )


reqs = {}


@app.post("/openai/{req_id}", response_class=HTMLResponse)
async def openai(request: Request, req_id: str, user_prompt: typing.Annotated[str, Form()]):

    log.debug(f'User prompt: {user_prompt}')

    reqs[req_id] = user_prompt

    messages = [
        {'sender': 'user', 'content': user_prompt},
        {'sender': 'ai', 'content': ''},
    ]

    sse_config = dict(
        listener='openai',
        path=f'/openaistream/{req_id}',
        topics=[
            'Response',
            'ResponseNoMass',
            'Terminate',
        ]
    )

    new_req_id = str(uuid.uuid4())

    return templates.TemplateResponse(
        request=request, name="index.jinja2",
        context=dict(
            messages=messages, req_id=new_req_id, sse_config=sse_config
        )
    )


def render_sse_html_chunk(event, chunk, attrs=None):
    if attrs is None:
        attrs = {}
    tmpl = Environment(
        loader=FileSystemLoader('templates/partials'),
        autoescape=select_autoescape(['html'])
    ).select_template(['streaming_chunk.jinja2'])
    html_chunk = tmpl.render(**dict(event=event, chunk=chunk, attrs=attrs))
    return html_chunk


def markdown_to_html_with_highlighting(source_markdown):
    # Configure Markdown to use the 'fenced_code' extension with Pygments
    md = markdown.Markdown(extensions=['fenced_code', 'codehilite'])

    # Convert Markdown to HTML
    html = md.convert(source_markdown)

    # Generate CSS for syntax highlighting
    css = HtmlFormatter().get_style_defs('.codehilite')

    # return css + html
    return f"{html.replace('\n', '
')}" def markdown_to_html_with_inline_highlighting(source_markdown): # Create an instance of HtmlFormatter with inline styles formatter = HtmlFormatter(style='default', cssclass='', noclasses=True) # Custom inline code highlighter def inline_highlight(match): language = match.group('lang') code = match.group('code') lexer = get_lexer_by_name(language, stripall=True) # highlighted_code = highlight(code, lexer, formatter) highlighted_code = highlight(code.replace('
', '\n'), lexer, formatter) return highlighted_code.replace('\n', '
') # return highlighted_code.replace('
', '
')
return highlighted_code # Replace fenced code blocks with highlighted code highlighted_markdown = re.sub( r'```(?P\w+)\s*(?P.*?)```', inline_highlight, source_markdown, flags=re.DOTALL ) # Convert Markdown to HTML html = markdown.markdown(highlighted_markdown) return html async def run_openai(req_id: str): from time import perf_counter from openai import AsyncOpenAI # user_prompt = 'What do you do?' # user_prompt = 'What up though?' user_prompt = reqs[req_id] log.debug(f'User prompt: {user_prompt}') messages = [ {'role': 'system', 'content': 'please response in markdown only.'}, {'role': 'user', 'content': user_prompt}, ] chunks = await AsyncOpenAI().chat.completions.create( model='gpt-4', messages=messages, stream=True, ) last = None result_chunks = [] # result_concat = '' result_concat = StringIO() code_start = False tick_count = 0 async for chunk in chunks: now = perf_counter() if last is not None: t = now - last else: t = 0 text = chunk.choices[0].delta.content # print(repr(text), t) # log.debug(f'Chunk: {text}') if text is not None: result_chunks.append((t, text)) result_concat.write(f"{text}") mdText = markdown_to_html_with_inline_highlighting( result_concat.getvalue().replace('\n', "
") ) yield mdText # await asyncio.sleep(0.4) else: log.debug('No text adding space') # await asyncio.sleep(0.2) # result_concat += ' ' # result_concat += '\n' # result_concat.write(' ') # # mdText = markdown.markdown(result_concat) # mdText = markdown.markdown(result_concat.getvalue()) # yield mdText last = now # log.debug(f'Final result: {result_concat.getvalue()}') # yield markdown.markdown(result_concat.getvalue().replace('\n', '
'))
# await asyncio.sleep(0.8) yield None # All Done # log.debug('OpenAI Chat Queueing Done') # log.debug(result_chunks) text = ''.join(text for _, text in result_chunks) @app.get("/openaistream/{req_id}", response_class=StreamingResponse) async def openaistream(request: Request, req_id: str): log.info(f"Request ID: {req_id}") # log.info(f"Request ID: {request.matchdict['req_id']}") async def openai_iter(): response_parts = [] async for resp in run_openai(req_id): if resp is None: chunk = render_sse_html_chunk( 'Terminate', '', { 'id': 'openai', 'hx-swap-oob': 'true', }, ) yield f'{chunk}\n\n'.encode('utf-8') # raise StopAsyncIteration break chunk = render_sse_html_chunk( 'Response', resp, { 'id': 'Response', # 'hx-swap-oob': 'beforeend', 'hx-swap-oob': 'true', }, ) yield f'{chunk}\n\n'.encode('utf-8') chunk = render_sse_html_chunk( 'Terminate', '', { 'id': 'openai', 'hx-swap-oob': 'true', }, ) yield f'{chunk}\n\n'.encode('utf-8') return StreamingResponse( openai_iter(), media_type='text/event-stream', ) if __name__ == '__main__': import uvicorn uvicorn.run('async_chat:app', host='0.0.0.0', port=6543, reload=True) # uvicorn.run('async_chat:app', host='0.0.0.0', port=6543, workers=4)

Chat WebApp with Assistant

Assistant API

Persist Assistant Details

Assistant Data

Multi Threaded Chats with Context and Persistence

Semantic Router

Extra

Deployment

FastAPI

FastUI

References