This demonstrates storing chat history between requests and using it to give the model context for new responses.
Most of the complex logic here is between chat_app.py which streams the response to the browser,
and chat_app.ts which renders messages in the browser.
from__future__importannotationsas_annotationsimportasyncioimportjsonimportsqlite3fromcollections.abcimportAsyncIteratorfromconcurrent.futures.threadimportThreadPoolExecutorfromcontextlibimportasynccontextmanagerfromdataclassesimportdataclassfromdatetimeimportdatetime,timezonefromfunctoolsimportpartialfrompathlibimportPathfromtypingimportAnnotated,Any,Callable,Literal,TypeVarimportfastapiimportlogfirefromfastapiimportDepends,Requestfromfastapi.responsesimportFileResponse,Response,StreamingResponsefromtyping_extensionsimportLiteralString,ParamSpec,TypedDictfrompydantic_aiimportAgentfrompydantic_ai.exceptionsimportUnexpectedModelBehaviorfrompydantic_ai.messagesimport(ModelMessage,ModelMessagesTypeAdapter,ModelRequest,ModelResponse,TextPart,UserPromptPart,)# 'if-token-present' means nothing will be sent (and the example will work) if you don't have logfire configuredlogfire.configure(send_to_logfire='if-token-present')agent=Agent('openai:gpt-4o',instrument=True)THIS_DIR=Path(__file__).parent@asynccontextmanagerasyncdeflifespan(_app:fastapi.FastAPI):asyncwithDatabase.connect()asdb:yield{'db':db}app=fastapi.FastAPI(lifespan=lifespan)logfire.instrument_fastapi(app)@app.get('/')asyncdefindex()->FileResponse:returnFileResponse((THIS_DIR/'chat_app.html'),media_type='text/html')@app.get('/chat_app.ts')asyncdefmain_ts()->FileResponse:"""Get the raw typescript code, it's compiled in the browser, forgive me."""returnFileResponse((THIS_DIR/'chat_app.ts'),media_type='text/plain')asyncdefget_db(request:Request)->Database:returnrequest.state.db@app.get('/chat/')asyncdefget_chat(database:Database=Depends(get_db))->Response:msgs=awaitdatabase.get_messages()returnResponse(b'\n'.join(json.dumps(to_chat_message(m)).encode('utf-8')forminmsgs),media_type='text/plain',)classChatMessage(TypedDict):"""Format of messages sent to the browser."""role:Literal['user','model']timestamp:strcontent:strdefto_chat_message(m:ModelMessage)->ChatMessage:first_part=m.parts[0]ifisinstance(m,ModelRequest):ifisinstance(first_part,UserPromptPart):assertisinstance(first_part.content,str)return{'role':'user','timestamp':first_part.timestamp.isoformat(),'content':first_part.content,}elifisinstance(m,ModelResponse):ifisinstance(first_part,TextPart):return{'role':'model','timestamp':m.timestamp.isoformat(),'content':first_part.content,}raiseUnexpectedModelBehavior(f'Unexpected message type for chat app: {m}')@app.post('/chat/')asyncdefpost_chat(prompt:Annotated[str,fastapi.Form()],database:Database=Depends(get_db))->StreamingResponse:asyncdefstream_messages():"""Streams new line delimited JSON `Message`s to the client."""# stream the user prompt so that can be displayed straight awayyield(json.dumps({'role':'user','timestamp':datetime.now(tz=timezone.utc).isoformat(),'content':prompt,}).encode('utf-8')+b'\n')# get the chat history so far to pass as context to the agentmessages=awaitdatabase.get_messages()# run the agent with the user prompt and the chat historyasyncwithagent.run_stream(prompt,message_history=messages)asresult:asyncfortextinresult.stream(debounce_by=0.01):# text here is a `str` and the frontend wants# JSON encoded ModelResponse, so we create onem=ModelResponse(parts=[TextPart(text)],timestamp=result.timestamp())yieldjson.dumps(to_chat_message(m)).encode('utf-8')+b'\n'# add new messages (e.g. the user prompt and the agent response in this case) to the databaseawaitdatabase.add_messages(result.new_messages_json())returnStreamingResponse(stream_messages(),media_type='text/plain')P=ParamSpec('P')R=TypeVar('R')@dataclassclassDatabase:"""Rudimentary database to store chat messages in SQLite. The SQLite standard library package is synchronous, so we use a thread pool executor to run queries asynchronously. """con:sqlite3.Connection_loop:asyncio.AbstractEventLoop_executor:ThreadPoolExecutor@classmethod@asynccontextmanagerasyncdefconnect(cls,file:Path=THIS_DIR/'.chat_app_messages.sqlite')->AsyncIterator[Database]:withlogfire.span('connect to DB'):loop=asyncio.get_event_loop()executor=ThreadPoolExecutor(max_workers=1)con=awaitloop.run_in_executor(executor,cls._connect,file)slf=cls(con,loop,executor)try:yieldslffinally:awaitslf._asyncify(con.close)@staticmethoddef_connect(file:Path)->sqlite3.Connection:con=sqlite3.connect(str(file))con=logfire.instrument_sqlite3(con)cur=con.cursor()cur.execute('CREATE TABLE IF NOT EXISTS messages (id INT PRIMARY KEY, message_list TEXT);')con.commit()returnconasyncdefadd_messages(self,messages:bytes):awaitself._asyncify(self._execute,'INSERT INTO messages (message_list) VALUES (?);',messages,commit=True,)awaitself._asyncify(self.con.commit)asyncdefget_messages(self)->list[ModelMessage]:c=awaitself._asyncify(self._execute,'SELECT message_list FROM messages order by id')rows=awaitself._asyncify(c.fetchall)messages:list[ModelMessage]=[]forrowinrows:messages.extend(ModelMessagesTypeAdapter.validate_json(row[0]))returnmessagesdef_execute(self,sql:LiteralString,*args:Any,commit:bool=False)->sqlite3.Cursor:cur=self.con.cursor()cur.execute(sql,args)ifcommit:self.con.commit()returncurasyncdef_asyncify(self,func:Callable[P,R],*args:P.args,**kwargs:P.kwargs)->R:returnawaitself._loop.run_in_executor(# type: ignoreself._executor,partial(func,**kwargs),*args,# type: ignore)if__name__=='__main__':importuvicornuvicorn.run('pydantic_ai_examples.chat_app:app',reload=True,reload_dirs=[str(THIS_DIR)])
Simple HTML page to render the app:
chat_app.html
<!DOCTYPE html><htmllang="en"><head><metacharset="UTF-8"><metaname="viewport"content="width=device-width, initial-scale=1.0"><title>Chat App</title><linkhref="https://cdn.jsdelivr.net/npm/bootstrap@5.3.3/dist/css/bootstrap.min.css"rel="stylesheet"><style>main{max-width:700px;}#conversation.user::before{content:'You asked: ';font-weight:bold;display:block;}#conversation.model::before{content:'AI Response: ';font-weight:bold;display:block;}#spinner{opacity:0;transition:opacity500msease-in;width:30px;height:30px;border:3pxsolid#222;border-bottom-color:transparent;border-radius:50%;animation:rotation1slinearinfinite;}@keyframesrotation{0%{transform:rotate(0deg);}100%{transform:rotate(360deg);}}#spinner.active{opacity:1;}</style></head><body><mainclass="border rounded mx-auto my-5 p-4"><h1>Chat App</h1><p>Ask me anything...</p><divid="conversation"class="px-2"></div><divclass="d-flex justify-content-center mb-3"><divid="spinner"></div></div><formmethod="post"><inputid="prompt-input"name="prompt"class="form-control"/><divclass="d-flex justify-content-end"><buttonclass="btn btn-primary mt-2">Send</button></div></form><divid="error"class="d-none text-danger">
Error occurred, check the browser developer console for more information.
</div></main></body></html><scriptsrc="https://cdnjs.cloudflare.com/ajax/libs/typescript/5.6.3/typescript.min.js"crossorigin="anonymous"referrerpolicy="no-referrer"></script><scripttype="module">// to let me write TypeScript, without adding the burden of npm we do a dirty, non-production-ready hack// and transpile the TypeScript code in the browser// this is (arguably) A neat demo trick, but not suitable for production!asyncfunctionloadTs(){constresponse=awaitfetch('/chat_app.ts');consttsCode=awaitresponse.text();constjsCode=window.ts.transpile(tsCode,{target:"es2015"});letscript=document.createElement('script');script.type='module';script.text=jsCode;document.body.appendChild(script);}loadTs().catch((e)=>{console.error(e);document.getElementById('error').classList.remove('d-none');document.getElementById('spinner').classList.remove('active');});</script>
TypeScript to handle rendering the messages, to keep this simple (and at the risk of offending frontend developers) the typescript code is passed to the browser as plain text and transpiled in the browser.
chat_app.ts
// BIG FAT WARNING: to avoid the complexity of npm, this typescript is compiled in the browser// there's currently no static type checkingimport{marked}from'https://cdnjs.cloudflare.com/ajax/libs/marked/15.0.0/lib/marked.esm.js'constconvElement=document.getElementById('conversation')constpromptInput=document.getElementById('prompt-input')asHTMLInputElementconstspinner=document.getElementById('spinner')// stream the response and render messages as each chunk is received// data is sent as newline-delimited JSONasyncfunctiononFetchResponse(response:Response):Promise<void>{lettext=''letdecoder=newTextDecoder()if(response.ok){constreader=response.body.getReader()while(true){const{done,value}=awaitreader.read()if(done){break}text+=decoder.decode(value)addMessages(text)spinner.classList.remove('active')}addMessages(text)promptInput.disabled=falsepromptInput.focus()}else{consttext=awaitresponse.text()console.error(`Unexpected response: ${response.status}`,{response,text})thrownewError(`Unexpected response: ${response.status}`)}}// The format of messages, this matches pydantic-ai both for brevity and understanding// in production, you might not want to keep this format all the way to the frontendinterfaceMessage{role:stringcontent:stringtimestamp:string}// take raw response text and render messages into the `#conversation` element// Message timestamp is assumed to be a unique identifier of a message, and is used to deduplicate// hence you can send data about the same message multiple times, and it will be updated// instead of creating a new message elementsfunctionaddMessages(responseText:string){constlines=responseText.split('\n')constmessages:Message[]=lines.filter(line=>line.length>1).map(j=>JSON.parse(j))for(constmessageofmessages){// we use the timestamp as a crude element idconst{timestamp,role,content}=messageconstid=`msg-${timestamp}`letmsgDiv=document.getElementById(id)if(!msgDiv){msgDiv=document.createElement('div')msgDiv.id=idmsgDiv.title=`${role} at ${timestamp}`msgDiv.classList.add('border-top','pt-2',role)convElement.appendChild(msgDiv)}msgDiv.innerHTML=marked.parse(content)}window.scrollTo({top:document.body.scrollHeight,behavior:'smooth'})}functiononError(error:any){console.error(error)document.getElementById('error').classList.remove('d-none')document.getElementById('spinner').classList.remove('active')}asyncfunctiononSubmit(e:SubmitEvent):Promise<void>{e.preventDefault()spinner.classList.add('active')constbody=newFormData(e.targetasHTMLFormElement)promptInput.value=''promptInput.disabled=trueconstresponse=awaitfetch('/chat/',{method:'POST',body})awaitonFetchResponse(response)}// call onSubmit when the form is submitted (e.g. user clicks the send button or hits Enter)document.querySelector('form').addEventListener('submit',(e)=>onSubmit(e).catch(onError))// load messages on page loadfetch('/chat/').then(onFetchResponse).catch(onError)