About: I am a software developer that loves solving problems by writing code. I also break down complex processes by writing technical guides.
Joined:
Oct 3, 2023
Xbeat : Supercharge E-cormmerce with Real-Time AI Powered by Redis 8
Publish Date: Aug 11
50 6
This is a submission for the Redis AI Challenge: Real-Time AI Innovators.
What I Built
The Xbeat project is an e‑commerce conversational agent that treats Redis 8 as its real-time AI data plane. When a shopper for example asks a question like “Show me noise‑cancelling headphones under $1000,” the Xbeat AI agent does not blindly forward the text to an LLM. It first performs a semantic cache lookup in Redis to detect near‑duplicate questions; if one is found, the stored answer is streamed immediately to the UI and the response is labeled with an X-Cache: hit header. If no near‑duplicate exists, the system embeds the question, streams a fresh answer from the model, and writes that response back into Redis as a vector‑addressable entry. This turns repeated queries into instant responses, keeps answers grounded in data the app controls, and avoids unnecessary model calls.
Redis is not a passive store in Xbeat. It actively decides whether to reuse or recompute every answer. The heart of the system is a RediSearch index that stores chat prompts, responses, and their embeddings. On each turn, the server computes an embedding for the last user message, executes a KNN(1) query over the vector field using cosine distance, and compares the nearest neighbor’s score to a tunable threshold. When the threshold is met, the cached response is streamed; when it is not, the model generates a fresh answer that is then persisted back into Redis with a TTL so the cache evolves in real time with the workload.
Redis Client and Connection Reuse
The Redis client is created once and cached at process scope, with TLS support for rediss:// URLs. This allows serverless handlers to reuse the connection efficiently. api/_lib/redisClient.js :
// api/_lib/redisClient.jsconst{createClient}=require('redis');letcachedClient=globalThis.__xbeatRedisClient||null;functiongetRedisUrl(){returnprocess.env.REDIS_URL||process.env.REDIS_URL_FALLBACK||'';}functiongetRedisPassword(){returnprocess.env.REDIS_PASSWORD}functionisRedisConfigured(){returnBoolean(getRedisUrl());}asyncfunctiongetRedisClient(){if (!isRedisConfigured())returnnull;if (cachedClient&&cachedClient.isOpen)returncachedClient;consturl=getRedisUrl();constpassword=getRedisPassword();constisTls=typeofurl==='string'&&url.startsWith('rediss://');constclient=createClient({url,password,socket:isTls?{tls:true}:undefined});client.on('error',(err)=>console.error('[Redis] Client error:',err));try{awaitclient.connect();globalThis.__xbeatRedisClient=client;cachedClient=client;returnclient;}catch (err){console.error('[Redis] Failed to connect:',err);returnnull;}}module.exports={isRedisConfigured,getRedisClient};
Semantic Cache: Vector Index and KNN Lookup
Xbeat stores chat history entries as HASHes and indexes them with RediSearch using an HNSW vector field in FLOAT32 format. The schema, lookup, and write‑back logic live in api/_lib/semanticCache.js :
// api/_lib/semanticCache.jsconst{SchemaFieldTypes,VectorAlgorithms}=require('redis');constINDEX_NAME='idx:chatcache';constKEY_PREFIX='xbeat:chatcache:';constVECTOR_DIM=parseInt(process.env.EMBEDDING_DIM||'1536',10);constDISTANCE_METRIC='COSINE';constDEFAULT_TTL=parseInt(process.env.CACHE_TTL||'86400',10);asyncfunctionensureCacheIndex(client){try{awaitclient.ft.create(INDEX_NAME,{prompt:{type:SchemaFieldTypes.TEXT},response:{type:SchemaFieldTypes.TEXT},embedding:{type:SchemaFieldTypes.VECTOR,TYPE:'FLOAT32',ALGORITHM:VectorAlgorithms.HNSW,DIM:VECTOR_DIM,DISTANCE_METRIC,},},{ON:'HASH',PREFIX:KEY_PREFIX});console.log(`[SemanticCache] Created index ${INDEX_NAME}`);}catch (e){if (typeofe?.message==='string'&&e.message.includes('Index already exists')){// OK}else{console.warn('[SemanticCache] ensureCacheIndex warning:',e);}}}asyncfunctionfindCacheHit(client,embeddingBuffer,threshold=0.1){constknnQuery='*=>[KNN 1 @embedding $B AS score]';constoptions={PARAMS:{B:embeddingBuffer},RETURN:['score','response','prompt'],SORTBY:{BY:'score',DIRECTION:'ASC'},DIALECT:2,};constresults=awaitclient.ft.search(INDEX_NAME,knnQuery,options);if (!results||!results.documents||results.documents.length===0)returnnull;constdoc=results.documents[0];constscore=parseFloat(doc?.value?.score??'1');if (!Number.isFinite(score))returnnull;if (score<=threshold){return{key:doc.id,prompt:doc?.value?.prompt??'',response:doc?.value?.response??'',score,};}returnnull;}asyncfunctionstoreCacheEntry(client,{prompt,response,embeddingBuffer,ttl=DEFAULT_TTL}){constkey=KEY_PREFIX+Date.now().toString(36)+'-'+Math.random().toString(36).slice(2,8);awaitclient.hSet(key,{prompt:String(prompt||''),response:String(response||''),embedding:embeddingBuffer});if (ttl&&Number.isFinite(ttl))awaitclient.expire(key,Math.max(1,Math.floor(ttl)));returnkey;}module.exports={ensureCacheIndex,findCacheHit,storeCacheEntry};
Chat Handler: Real‑Time Reuse or Recompute
The /api/chat route integrates the semantic cache into the request path. It parses the UI messages, embeds the last user message, checks Redis for a near‑duplicate, and either streams the cached response or streams a new model response and writes it back. The relevant logic is below api/chat.js:
// api/chat.js (excerpt)const{getRedisClient,isRedisConfigured}=require('./_lib/redisClient.js');const{ensureCacheIndex,findCacheHit,storeCacheEntry,extractLastUserText,extractTextFromUIMessage}=require('./_lib/semanticCache.js');module.exports=asyncfunction (req,res){// ...imports and parsing...constthreshold=Number(process.env.SEMANTIC_DISTANCE_THRESHOLD||'0.1');constcanUseCache=isRedisConfigured();letclient=null;try{if (canUseCache){client=awaitgetRedisClient();if (client){awaitensureCacheIndex(client);constuserText=extractLastUserText(uiMessages);if (userText&&userText.trim().length>0){constembeddingBuffer=awaitembedTextToBuffer(userText);consthit=awaitfindCacheHit(client,embeddingBuffer,threshold);if (hit&&hit.response){conststream=createUIMessageStream({execute:({writer})=>{writer.write({type:'text',text:hit.response});}});pipeUIMessageStreamToResponse({response:res,stream,headers:{'X-Cache':'hit'}});return;}}}}}catch (cacheErr){console.warn('[Chat API] Cache error (continuing without cache):',cacheErr);}constresult=streamText({model:openai(modelId),system:'You are a helpful AI shopping assistant for X-Beat (audio gear store). Be concise, friendly, and product-focused.',messages:modelMessages});constuiStream=result.toUIMessageStream({onFinish:async ({responseMessage})=>{try{constresponseText=extractTextFromUIMessage(responseMessage);if (client&&responseText&&responseText.trim().length>0){constuserText=extractLastUserText(uiMessages);constembeddingBuffer=awaitembedTextToBuffer(userText);awaitstoreCacheEntry(client,{prompt:userText,response:responseText,embeddingBuffer});}}catch (e){console.warn('[Chat API] onFinish error:',e);}},});pipeUIMessageStreamToResponse({response:res,stream:uiStream,headers:{'X-Cache':'miss'}});};
Search Handler: Vector Discovery Scaffold
The search route is included to show how KNN product discovery plugs into the same embedding pipeline. In this branch, api/search.js is scaffolded so the UI can be exercised while the product vector index module is isolated:
// api/search.js'use strict';module.exports=async (req,res)=>{if (req.method!=='GET'){res.status(405).send('Method Not Allowed');return;}constq=(req.query&&(req.query.q||req.query.query))||'';// TODO: implement RedisVL vector KNN search when Redis is configured.res.status(200).json({query:q,results:[],pending:true});};
Each chat turn follows the same loop: extract the last user message, embed it, query Redis for the nearest neighbor, stream a cached answer when the cosine distance is within the threshold, or stream a fresh model answer and write it back as a new HASH with a TTL. This turns every computation into a reusable asset and continuously lowers average latency and cost as traffic grows. Because thresholds, embedding dimension, and TTL are controlled by environment variables, deployments can tune strictness and freshness without code changes.
Running and Configuration
Use npm run dev:vercel to run both the React app and the serverless API locally via Vercel CLI. Set OPENAI_API_KEY for embeddings and chat, set REDIS_URL and REDIS_PASSWORD (or embed credentials in the URL) to connect to Redis 8 or Redis Stack, set SEMANTIC_DISTANCE_THRESHOLD to govern cache hit strictness (default 0.1), set CACHE_TTL to control entry lifecycle (default 86400), and set EMBEDDING_DIM to match the embedding model (default 1536). In production, deploy the same serverless handlers and provide the same environment variables.
Xbeat uses Redis to decide in real time whether to answer from memory or to compute a new response. The vector‑addressable semantic cache turns repeated or near‑repeated questions into instantaneous streams, while misses still benefit from structured write‑back that improves future latency. The result is an application where the data layer is the engine for retrieval, grounding, and reuse. Answers get smarter because they are grounded, and they get faster because every answer becomes a new shard of reusable, semantically searchable knowledge.
Hi there 👋 I’m interested to know how to generate the subtitles for your video? Are they synced to your voice or animated?