In today’s world of digital applications and real-time insights, raw data is everywhere—user interactions, third-party APIs, internal systems, logs, surveys—you name it. But raw data isn’t very useful on its own.
To make that data meaningful, actionable, and ready for use (by humans or AI), we need data pipelines.
Whether you're building an analytics dashboard, powering a chatbot, or just making sure your backend can handle user inputs and external data smoothly, data pipelines are the behind-the-scenes workhorses that make it happen.
Let’s break down exactly what a pipeline is, why it’s useful, and how you can write one correctly—with a real-life example from a legal-tech app built in Next.js.
🚰 What Is a Data Pipeline?
A data pipeline is a structured series of steps that automate the flow of data from one system to another—often involving transformation, enrichment, and storage along the way.
At its core, a pipeline does three things:
- Extract – Get data from one or more sources (API, form input, DB, etc.).
- Transform – Clean it, summarize it, or convert it into another format.
- Load – Store the processed data into a database or system for future use.
This model is commonly known as ETL (Extract, Transform, Load).
🚀 Why Do We Need Data Pipelines?
Here’s why data pipelines are essential in any scalable or data-driven app:
- Automation: Removes the need for manual intervention in repetitive tasks.
- Scalability: Can handle large or growing data flows over time.
- Reliability: Ensures data gets from point A to point B consistently.
- Real-time Experience: Enables real-time features like chat, case tracking, live dashboards, etc.
- Separation of Concerns: Each step (input, API call, transformation, save) is modular and debuggable.
Imagine a legal app that collects user case descriptions, sends them to a summarizer, and stores structured legal insights. Without a pipeline, you'd have scattered logic and manual dependencies. With a pipeline? Clean, composable, and repeatable flow.
🧩 What Does a Data Pipeline Look Like?
Here’s a simplified flow:
graph TD
A[User Input] --> B[Input Validation]
B --> C[External API Call]
C --> D[Data Transformation / AI Processing]
D --> E[Database Save]
E --> F[Send Response]
Each of these steps may involve retries, error handling, and conditionals, but conceptually this is what most pipelines do.
✍️ A Real Example: Legal Case Pipeline in Next.js
Let’s say you’re building a legal chatbot where a user submits a case description.
Here’s what’s happening in your code (summarized for readability):
✅ Step 1: User Input
const { description, opponent, userId } = req.body;
- User fills out a legal intake form.
- Backend receives the input via an API endpoint.
🔍 Step 2: Get Relevant Cases via API
const docList = await getFromKanoon(description);
- You send this case description to the Indian Kanoon API.
- It returns a list of relevant legal documents (real court cases, judgments).
🧠 Step 3: Transform Data via AI
const processedJudgment = await runSummarizerChain({
judgment: selectedDoc.content,
});
- You take the top matching document.
- Run it through an AI model (e.g., Langflow/Gemini) to summarize it.
- Extract structured details: facts, laws involved, judgment reasoning, etc.
💾 Step 4: Store the Processed Case
const activeCase = await prisma.activeCases.create({
data: {
userId,
description,
opponent,
summary: processedJudgment.summary,
lawsInvolved: processedJudgment.lawsInvolved,
judgment: processedJudgment.judgment,
todo: processedJudgment.todo,
},
});
- You save all the AI-processed, structured information into the database.
- This active case now becomes trackable, searchable, and viewable for the user.
📤 Step 5: Respond
return res.status(200).json({ message: "Case stored!", data: activeCase });
- You return the response to the frontend so it can update the UI.
🔐 Key Features of a Good Data Pipeline
When building pipelines like this, keep these best practices in mind:
Feature | Why It Matters |
---|---|
Idempotency | Re-running the same step shouldn’t duplicate data or break logic. |
Error Handling | Use try-catch blocks and log errors to debug issues later. |
Retries | For flaky external APIs (like Indian Kanoon), always retry failed calls. |
Logging | Add logs at each step to monitor pipeline behavior. |
Timeouts | Prevent endless hangs on API calls or long tasks. |
Validation | Validate inputs before triggering anything downstream. |
Security | Sanitize inputs, and secure API calls to prevent abuse or injection. |
🛠️ Tools That Help Build Pipelines
Here are some useful libraries and platforms to help build pipelines in modern web apps:
- Backend Frameworks: Next.js API routes, Express.js, NestJS
- Database Clients: Prisma, Sequelize, Mongoose
- Message Queues (for async pipelines): Redis, BullMQ, Kafka
- API Integrations: Axios, Fetch, OpenAPI
- Workflow Orchestration (advanced): Airflow, Temporal.io, n8n
🧠 Final Thoughts
Think of pipelines as the arteries of your application. They carry the lifeblood (data) from the outer edge (user or API) to the core süt4(DB or logic)—transforming and enriching it along the way.
The pipeline you’ve built for your legal-tech app is solid—it turns unstructured user input into a structured, intelligent, and storable legal case. That’s real power.
As your system scales, you might move some parts of this pipeline into background workers (like summarization) or queue-based architectures—but the foundational pattern will remain.
Here is the lifeline the MVP code for my Project
import { NextResponse } from 'next/server';
import { PrismaClient } from '@/generated/prisma';
import { extractSections } from './extractSections';
const prisma = new PrismaClient();
const LANGFLOW_API_URL = 'https://api.langflow.astra.datastax.com/lf/043396b0-e82a-4e0f-aca3-ad6828b04b34/api/v1/run/2c978a87-7226-43c1-bccd-ca6082257444';
const FINAL_RESPONSE_URL = 'https://api.langflow.astra.datastax.com/lf/043396b0-e82a-4e0f-aca3-ad6828b04b34/api/v1/run/d3c72969-4e3e-464c-afbe-484d3243af21';
const LANGFLOW_TOKEN = process.env.FETCHING_DOCS_LANGFLOW_TOKEN;
const FINAL_RESPONSE_TOKEN = process.env.FINAL_RESPONSE_LANGFLOW_TOKEN;
const INDIAN_KANOON_API_KEY = process.env.KANOON_API_KEY;
export async function POST(request: Request) {
try {
const { description, userId, caseId } = await request.json();
console.log("[1] Backend - Received:", { description, userId, caseId });
// Validate inputs
if (!description) {
return NextResponse.json({ error: 'Description is required' }, { status: 400 });
}
if (!caseId) {
return NextResponse.json({ error: 'Case ID is required' }, { status: 400 });
}
// Verify case exists and belongs to user
const caseRecord = await prisma.case.findUnique({
where: { id: caseId, userId: userId || undefined },
});
if (!caseRecord) {
return NextResponse.json({ error: 'Case not found or unauthorized' }, { status: 404 });
}
// Step 1: Fetch initial Langflow response for keywords
const payload = {
input_value: description,
output_type: "chat",
input_type: "chat",
session_id: "user_1",
};
console.log("[2] Backend - Sending document fetching payload to Langflow:", payload);
const langflowRes = await fetch(LANGFLOW_API_URL, {
method: 'POST',
headers: {
'Content-Type': 'application/json',
'Authorization': `Bearer ${LANGFLOW_TOKEN}`,
},
body: JSON.stringify(payload),
});
if (!langflowRes.ok) {
console.error("[3] Backend - Langflow API error:", langflowRes.status, langflowRes.statusText);
throw new Error('Langflow API call failed');
}
const langflowData = await langflowRes.json();
const langflowMessage = langflowData.outputs?.[0]?.outputs?.[0]?.results?.message?.text;
console.log("[4] Backend - Langflow raw output:", langflowMessage);
let parsedResponse;
try {
const cleanJson = langflowMessage.replace(/```
json\n|\n
```/g, '').trim();
parsedResponse = JSON.parse(cleanJson);
} catch (error) {
console.error("[5] Backend - Error parsing Langflow response:", error);
throw new Error('Failed to parse Langflow response');
}
const caseOneLiner = parsedResponse.query_keywords?.[0] ?? '';
console.log("[6] Backend - Extracted keyword/query:", caseOneLiner);
// Step 2: Search Indian Kanoon API
const searchQuery = encodeURIComponent(caseOneLiner);
const kanoonURL = `https://api.indiankanoon.org/search/?formInput=${searchQuery}&pagenum=0`;
console.log("[7] Backend - Final Kanoon URL:", kanoonURL);
const kanoonRes = await fetch(kanoonURL, {
method: 'POST',
headers: {
'Authorization': `Token ${INDIAN_KANOON_API_KEY}`,
},
});
if (!kanoonRes.ok) {
console.error("[8] Backend - Indian Kanoon API error:", kanoonRes.status, kanoonRes.statusText);
throw new Error('Indian Kanoon API call failed');
}
const kanoonData = await kanoonRes.json();
const docs = kanoonData?.docs.slice(0, 2) ?? [];
const extractedDocs = docs.map((doc: any) => ({
title: doc.title,
headline: doc.headline,
docId: doc.tid,
}));
console.log("[9] Backend - Extracted Docs:", extractedDocs);
// Step 3: Process each document independently
const processedDocs = [];
for (const doc of extractedDocs) {
const { docId, title } = doc;
console.log("[10] Backend - Processing document ID:", docId);
const existingDoc = await prisma.extractedDoc.findUnique({
where: { docId },
select: { docId, title },
});
if (existingDoc) {
console.log("[11] Backend - Document already exists in database, skipping processing:", docId);
processedDocs.push({
docId: existingDoc.docId,
title: existingDoc.title,
});
continue;
}
// Fetch document content from Indian Kanoon
const kanoonDocRes = await fetch(`https://api.indiankanoon.org/doc/${docId}/`, {
method: 'POST',
headers: {
'Authorization': `Token ${INDIAN_KANOON_API_KEY}`,
'Accept': 'text/html',
},
});
if (!kanoonDocRes.ok) {
console.error("[12] Backend - Indian Kanoon document fetch error:", kanoonDocRes.status, kanoonDocRes.statusText);
continue; // Skip to next document
}
const rawContent = await kanoonDocRes.text();
const sections = extractSections(rawContent);
console.log("[13] Backend - Extracted Sections:", sections);
// Step 4: Generate HTML summary using final Langflow API
if (!FINAL_RESPONSE_TOKEN) {
throw new Error('Final response token is not set');
}
const finalPayload = {
input_value: JSON.stringify({ docId, sections, description }),
output_type: "chat",
input_type: "chat",
session_id: "user_1",
};
console.log("[14] Backend - Sending summary payload to Langflow:", finalPayload);
const finalLangflowRes = await fetch(FINAL_RESPONSE_URL, {
method: 'POST',
headers: {
'Content-Type': 'application/json',
'Authorization': `Bearer ${FINAL_RESPONSE_TOKEN}`,
},
body: JSON.stringify(finalPayload),
});
if (!finalLangflowRes.ok) {
console.error("[15] Backend - Final Langflow API error:", finalLangflowRes.status, finalLangflowRes.statusText);
continue; // Skip to next document
}
const finalLangflowData = await finalLangflowRes.json();
const finalMessage = finalLangflowData.outputs?.[0]?.outputs?.[0]?.results?.message?.text;
console.log("[16] Backend - Final Langflow raw output:", finalMessage);
let summaryResponse;
try {
const cleanSummaryJson = finalMessage.replace(/```
json\n|\n
```/g, '').trim();
summaryResponse = JSON.parse(cleanSummaryJson);
} catch (error) {
console.error("[17] Backend - Error parsing final Langflow response:", error);
continue; // Skip to next document
}
const { title: summaryTitle, aiSummary } = summaryResponse;
console.log("[18] Backend - Parsed summary:", { docId, summaryTitle });
// Step 5: Save to Prisma database
try {
const savedDoc = await prisma.extractedDoc.upsert({
where: { docId: docId },
update: {
title: summaryTitle,
rawContent,
aiSummary,
caseId,
},
create: {
docId: docId,
title: summaryTitle,
rawContent,
aiSummary,
caseId,
},
});
console.log("[19] Backend - Saved document to database:", savedDoc);
processedDocs.push({ docId: docId, title: summaryTitle, aiSummary });
} catch (prismaError) {
console.error("[20] Backend - Prisma save error:", prismaError);
continue; // Skip to next document
}
}
// Step 6: Return response
if (processedDocs.length === 0) {
return NextResponse.json(
{ success: false, message: 'No documents were successfully processed', docIds: [] },
{ status: 500 }
);
}
return NextResponse.json({
success: true,
message: `Successfully processed or retrieved ${processedDocs.length} document(s)`,
docIds: processedDocs.map(doc => doc.docId),
documents: processedDocs,
});
} catch (error) {
console.error("[🔥] Backend - Error in fetch-docs route:", error);
return NextResponse.json(
{ error: 'Internal server error', docIds: [] },
{ status: 500 }
);
} finally {
await prisma.$disconnect();
}
}