JUNE 26 2025
Bringing agents to life: real-time event streaming with Modus
Learn how to build intelligent agents that communicate in real-time through GraphQL subscriptions with Modus stateful agents.

Neo's eyes snap open in his pod. The machines have evolved. Agent Smith isn't just replicating anymore—he's learning, adapting, becoming something more dangerous than the resistance has ever faced. But this time, we're not going in blind.
What if you could subscribe to your agents via a SSE stream? Watch their thoughts, monitor their progress, and get instant alerts when something critical happens—all through the same interface you already know and love.
With Modus agents and real-time event streaming, you can finally move beyond silent AI workers to intelligent systems that communicate, alert, and collaborate in real-time. This isn't just about monitoring—it's about creating agents that feel genuinely alive and responsive, delivering user experiences that feel magical while maintaining developer productivity.
While we'll explore these concepts through a Matrix-themed surveillance system, the same patterns power real-world applications your users actually need: sales agents that provide live prospect research updates, marketing agents that stream campaign optimization insights, customer service agents that offer real-time sentiment analysis, and support agents that coordinate across teams while keeping users informed of every step.
Let's build a Matrix surveillance system where agents monitor anomalous activity, provide real-time progress updates, and create engaging user experiences through instant threat alerts.
What are Modus agents?
Before we dive into the code, let's understand what makes Modus agents special. Unlike traditional AI tools that process requests and forget everything afterward, Modus agents are persistent that maintain memory across interactions.
Think of an agent as a digital teammate that:
- Remembers conversations and context across sessions
- Maintains operational state even through system restarts
- Streams real-time events about their progress and discoveries
- Coordinates with other agents through shared knowledge
- Develops personality through consistent themed interactions
When you create an agent, you're not just calling an API—you're bringing a persistent intelligence to life that can work autonomously, build relationships, and communicate naturally through real-time event streams.
The silent agent problem
Most AI agents today operate in complete darkness. You deploy them, send tasks, and hope they're working:
// Traditional agent interaction
const result = await agent.processTask('Monitor Agent Smith activity')
// Wait... and hope something is happening
// No progress updates, no intermediate alerts
// Agent could be stuck, compromised, or detecting critical threats
// Finally get result (or timeout)
console.log(result) // "Monitoring complete" - but what happened?
The fundamental problems affect both developers and users:
Developer Pain:
- No real-time alerts: Critical threats only surface when tasks complete
- Lost operational intelligence: Can't see agent reasoning or decision-making
- Silent failures: Problems only discovered after mission failure
- No collaboration: Agents can't coordinate or share intelligence
User Experience Problems:
- Loading screens everywhere: Users wait without knowing what's happening
- No progress feedback: Tasks feel slow even when progressing normally
- Anxiety and uncertainty: Users don't know if their request is being processed
- Generic, robotic interactions: AI feels like a tool, not a helpful assistant
You're managing ghost processes—intelligent systems that exist but can't communicate their discoveries or call for help when needed.
What if agents could talk back?
Imagine if your agents could stream their thoughts, discoveries, and alerts in real-time through GraphQL subscriptions:
subscription {
agentEvent(agentId: "surveillance_neo_001") {
name
data
timestamp
}
}
Your agents become living intelligence sources that transform user experiences:
For Developers:
- Alert instantly when threats are detected
- Share reasoning as they analyze patterns
- Coordinate automatically with other agents and systems
- Provide detailed progress on long-running operations
For Users:
- See progress in real-time instead of staring at loading screens
- Get personality-driven updates that feel engaging and human
- Receive instant notifications when important events happen
- Experience responsive interactions that build trust and engagement
Building intelligent surveillance agents
The machines are learning faster than ever. We need agents that can think, adapt, and warn us before it's too late. Let's create agents that monitor the Matrix for anomalous Agent activity and alert the resistance through real-time event streams.
Setting up your agent infrastructure
- https://docs.hypermode.com/modus/overview
First, we need to configure Modus
to connect to the AI models and knowledge storage that power our surveillance
network. The modus.json
configuration file tells Modus how to connect to
external services:
{
"models": {
"threat-analyzer": {
"sourceModel": "gpt-4o-mini",
"connection": "openai"
}
},
"connections": {
"dgraph": {
"type": "dgraph",
"connString": "dgraph://your-graph.hypermode.host:443?sslmode=verify-ca&bearertoken={{API_KEY}}"
}
}
}
This configuration sets up two key components:
- Models: The
threat-analyzer
gives our agents access to GPT-4o-mini for intelligent threat analysis - Connections: The
dgraph
connection provides a knowledge graph where agents can store and query historical intelligence data
You'll also need to set your API keys in your environment:
# .env.dev.local
MODUS_OPENAI_API_KEY=your_openai_api_key_here
MODUS_DGRAPH_API_KEY=your_dgraph_api_key_here
Creating your first surveillance agent
Now let's create a surveillance agent that embeds the essential agent framework while adding our specific intelligence capabilities:
import (
"time"
"encoding/json"
"github.com/hypermodeinc/modus/sdk/go/pkg/agents"
"github.com/hypermodeinc/modus/sdk/go/pkg/models"
"github.com/hypermodeinc/modus/sdk/go/pkg/dgraph"
)
type MatrixSurveillanceAgent struct {
agents.AgentBase // Provides all the agent infrastructure
MonitoredSectors []string `json:"monitored_sectors"`
ThreatDatabase []string `json:"threat_database"`
AlertThreshold int `json:"alert_threshold"`
ActiveIncidents int `json:"active_incidents"`
LastThreatLevel float64 `json:"last_threat_level"`
OperationalStatus string `json:"operational_status"`
}
func (m *MatrixSurveillanceAgent) Name() string {
return "MatrixSurveillanceAgent"
}
The agent embeds agents.AgentBase
, which provides all the infrastructure for
state management, event streaming, and persistence. Your surveillance
data—monitored sectors, threat intelligence, incident tracking—lives as fields
in the struct, automatically preserved across all interactions.
Agent initialization and awakening
When an agent first comes online, it needs to establish its operational parameters and announce its presence to the resistance:
func (m *MatrixSurveillanceAgent) OnInitialize() error {
m.MonitoredSectors = []string{"Downtown Loop", "Financial District", "Server Farms"}
m.AlertThreshold = 7 // Threat level that triggers alerts
m.ActiveIncidents = 0
m.LastThreatLevel = 0.0
m.OperationalStatus = "awakening"
// Emit initialization event with personality
m.PublishEvent(map[string]interface{}{
"eventType": "agent_awakened",
"message": "Neo has entered the Matrix. Surveillance protocols active.",
"sectors": m.MonitoredSectors,
"status": "operational",
"personality": "resistance_operative",
"awakening_time": time.Now().Format("15:04:05"),
})
m.OperationalStatus = "operational"
return nil
}
The PublishEvent
method is where the magic happens—this streams real-time
events to any GraphQL subscriptions listening to this agent. Users immediately
see "Neo is awake and monitoring the Matrix" instead of wondering if their agent
request worked.
Deploying your surveillance network
Agents are created and managed through regular Modus functions that become part of your GraphQL API. These functions handle agent lifecycle operations:
// Register your agent type during initialization
func init() {
agents.Register(&MatrixSurveillanceAgent{})
}
// Deploy a new surveillance agent - becomes a GraphQL mutation
func DeploySurveillanceAgent() (string, error) {
agentInfo, err := agents.Start("MatrixSurveillanceAgent")
if err != nil {
return "", err
}
return agentInfo.Id, nil
}
When you call this function through GraphQL, it returns a unique agent ID that clients must store to communicate with the agent:
mutation {
deploySurveillanceAgent
}
Response:
{
"data": {
"deploySurveillanceAgent": "agent_neo_001"
}
}
Perfect! Agent Neo-001 is now active in the Matrix, and subscribers to
agentEvent(agentId: "agent_neo_001")
immediately receive the awakening event
we configured.
Processing threats with institutional knowledge
When threat data arrives, agents analyze it using accumulated organizational knowledge and stream alerts. The machines are getting smarter, so we need our agents to remember every encounter:
func (m *MatrixSurveillanceAgent) OnReceiveMessage(
msgName string, data *string) (*string, error) {
switch msgName {
case "analyze_threat":
return m.analyzeThreatWithIntelligence(data)
case "surveillance_sweep":
return m.performSurveillanceSweep()
case "get_status":
return m.getOperationalStatus()
default:
return nil, fmt.Errorf("unknown directive: %s", msgName)
}
}
The core threat analysis method demonstrates real-time event streaming throughout a complex intelligence operation:
func (m *MatrixSurveillanceAgent) analyzeThreatWithIntelligence(data *string) (*string, error) {
// 1. Stream initial processing event
m.PublishEvent(map[string]interface{}{
"eventType": "threat_analysis_started",
"message": "Neo is scanning the Matrix for anomalies...",
"user_friendly_message": "🔍 Starting threat analysis - connecting to the mainframe",
})
// 2. Parse threat data and query knowledge graph for context
var threat ThreatData
json.Unmarshal([]byte(*data), &threat)
// Query historical patterns from institutional knowledge
query := dgraph.NewQuery(`
query getThreatContext($agentName: string) {
threats(func: eq(agent_name, $agentName)) {
threat_level
behavior_pattern
location
timestamp
}
}
`).WithVariable("$agentName", threat.AgentName)
response, err := dgraph.ExecuteQuery("dgraph", query)
if err != nil {
return nil, err
}
// 3. Stream progress update
m.PublishEvent(map[string]interface{}{
"eventType": "threat_analysis_progress",
"progress": 0.6,
"user_friendly_message": fmt.Sprintf("🔎 Analyzing %s patterns - almost done!", threat.AgentName),
"matrix_data": "green_code_cascading",
})
// 4. AI analysis with accumulated intelligence
analysis, err := m.generateThreatAnalysis(threat, response.Json)
if err != nil {
return nil, err
}
// 5. Update agent state and determine alert level
m.ThreatDatabase = append(m.ThreatDatabase, analysis)
m.LastThreatLevel = float64(threat.ThreatLevel) / 10.0
// 6. Stream appropriate alert based on threat level
if threat.ThreatLevel >= m.AlertThreshold {
m.ActiveIncidents++
m.PublishEvent(map[string]interface{}{
"eventType": "matrix_threat_detected",
"message": fmt.Sprintf("ALERT: %s displaying anomalous behavior", threat.AgentName),
"threat_level": threat.ThreatLevel,
"location": threat.Location,
"analysis": analysis,
"severity": "CRITICAL",
"user_notification": fmt.Sprintf("🚨 Critical threat detected! %s is acting suspiciously in %s", threat.AgentName, threat.Location),
"recommended_action": "Consider immediate defensive measures",
})
} else {
m.PublishEvent(map[string]interface{}{
"eventType": "threat_analysis_complete",
"message": fmt.Sprintf("✅ Analysis complete for %s", threat.AgentName),
"threat_level": threat.ThreatLevel,
"status": "normal",
"user_friendly_message": "Threat analyzed - all systems normal",
})
}
result := fmt.Sprintf("Threat analysis complete. Alert level: %d", threat.ThreatLevel)
return &result, nil
}
This method demonstrates the power of stateful agents with real-time communication. The agent maintains persistent memory of all threats analyzed, queries institutional knowledge for context, uses AI for intelligent analysis, and streams progress updates throughout the entire process.
Let's create functions to communicate with our surveillance agent:
type ThreatData struct {
AgentName string `json:"agent_name"`
Location string `json:"location"`
Behavior string `json:"behavior"`
ThreatLevel int `json:"threat_level"`
Timestamp string `json:"timestamp"`
}
func AnalyzeThreatData(agentId string, threatData ThreatData) (string, error) {
threatJson, err := json.Marshal(threatData)
if err != nil {
return "", err
}
dataStr := string(threatJson)
result, err := agents.SendMessage(agentId, "analyze_threat", agents.WithData(dataStr))
if err != nil {
return "", err
}
if result == nil {
return "", fmt.Errorf("no response from surveillance agent")
}
return *result, nil
}
func GetSurveillanceStatus(agentId string) (string, error) {
result, err := agents.SendMessage(agentId, "get_status")
if err != nil {
return "", err
}
if result == nil {
return "", fmt.Errorf("no response from agent")
}
return *result, nil
}
The first threat emerges
Something's wrong in the Matrix. Agent Smith has been spotted exhibiting behavior patterns we've never seen before. Let's feed this intelligence to our surveillance network:
mutation {
analyzeThreatData(
agentId: "agent_neo_001"
threatData: {
agentName: "Smith"
location: "Downtown Loop - Financial District"
behavior: "
Enhanced replication capabilities detected.
Agent displaying ability to replicate across multiple sectors simultaneously.
Viral propagation patterns suggest exponential growth potential."
threatLevel: 9
timestamp: "2025-06-12T14:30:15Z"
}
)
}
Response:
{
"data": {
"analyzeThreatData": "Threat analysis complete. Alert level: 9"
}
}
The analysis is complete, but the real magic is happening in the event stream. Your surveillance agent is broadcasting live intelligence updates to anyone subscribed to its events.
Real-time event streaming with SSE
Modus agent events work seamlessly with popular GraphQL frameworks like URQL and GraphQL SSE. This setup enables Server-Sent Events (SSE) for reliable, real-time communication that's more efficient than WebSockets for this use case.
Set up your frontend to receive agent intelligence through Server-Sent Events:
"use client"
import { createClient, Provider, cacheExchange, fetchExchange, subscriptionExchange } from "urql"
import { createClient as createSSEClient } from "graphql-sse"
const sseClient = createSSEClient({
url: "http://localhost:8686/graphql",
})
export const client = createClient({
url: "http://localhost:8686/graphql",
exchanges: [
cacheExchange,
fetchExchange,
subscriptionExchange({
forwardSubscription(operation) {
return {
subscribe: (sink) => {
const dispose = sseClient.subscribe(operation, sink)
return {
unsubscribe: dispose,
}
},
}
},
}),
],
})
interface GraphQLProviderProps {
children: React.ReactNode
}
export function GraphQLProvider({ children }: GraphQLProviderProps) {
return <Provider value={client}>{children}</Provider>
}
This configuration creates a GraphQL client that can handle queries, mutations,
and real-time subscriptions through SSE. The subscriptionExchange
forwards
GraphQL subscription operations to the SSE client, enabling real-time
communication with your agents.
Subscribe to your surveillance agents and watch the Matrix in real-time:
const AGENT_INTELLIGENCE_SUBSCRIPTION = `
subscription SurveillanceIntelligence($agentId: String!) {
agentEvent(agentId: $agentId) {
name
data
timestamp
}
}
`
function SurveillanceMonitor({ agentId }) {
const [{ data }] = useSubscription({
query: AGENT_INTELLIGENCE_SUBSCRIPTION,
variables: { agentId },
})
useEffect(() => {
if (data?.agentEvent) {
const event = data.agentEvent
if (event.name === "matrix_threat_detected") {
// Instant threat alert with user-friendly notification
showCriticalAlert({
title: "⚠️ Matrix Threat Detected",
message: event.data.user_notification,
action: event.data.recommended_action,
})
notifyResistanceCommand(event.data)
}
if (event.name === "threat_analysis_progress") {
// Real-time progress updates eliminate loading anxiety
updateProgressBar(event.data.progress)
showProgressMessage(event.data.user_friendly_message)
}
}
}, [data])
return <LiveIntelligenceFeed />
}
Your frontend receives live intelligence updates that can power real-time dashboards, instant notifications, and progressive user interfaces—no polling, no waiting, just pure real-time surveillance intelligence streaming directly from your agents.
The living intelligence experience
Here's what real-time agent intelligence looks like when Neo awakens in the Matrix:
Agent Awakening (User sees engaging personality):
Subscription receives: {
"eventType": "agent_awakened",
"message": "Neo has entered the Matrix. Surveillance protocols active.",
"status": "operational",
"awakening_time": "14:30:15"
}
UI shows: "🕶️ Neo is awake and monitoring the Matrix for you..."
Real-time Progress (No more loading anxiety):
Agent processes threat data...
Subscription receives: {
"eventType": "threat_analysis_progress",
"progress": 0.6,
"user_friendly_message": "🔎 Analyzing Smith patterns - almost done!",
"matrix_data": "green_code_cascading"
}
UI shows: Progress bar + "Neo is scanning behavioral patterns... 60% complete"
Green Matrix code animation in background
Instant Alerts (Users feel informed and in control):
CRITICAL ALERT - The machines have evolved:
Subscription receives: {
"eventType": "matrix_threat_detected",
"user_notification": "⚠️ Critical threat detected! Smith is acting suspiciously in Financial District",
"recommended_action": "Consider immediate defensive measures",
"threat_level": 9,
"confidence": 0.9
}
UI shows: Red alert modal with pulsing warning
Audio: "Warning - Agent Smith threat detected"
Action buttons: "Alert Morpheus" | "Evacuate Sector" | "Monitor Closely"
Let's test the full surveillance cycle
Now let's really put our surveillance network through its paces. The machines are learning, and we need to see how our agents handle escalating threats:
Normal threat analysis
First, let's analyze a routine Agent sighting:
mutation {
analyzeThreatData(
agentId: "agent_neo_001"
threatData: {
agentName: "Johnson"
location: "Megacity Financial - Floor 23"
behavior: "Standard patrol pattern. Normal behavioral parameters within expected ranges."
threatLevel: 3
timestamp: "2025-06-12T14:45:00Z"
}
)
}
Response:
{
"data": {
"analyzeThreatData": "Threat analysis complete. Alert level: 3"
}
}
Your event stream shows normal operations:
{
"data": {
"agentEvent": {
"name": "threat_analysis_complete",
"data": {
"message": "✅ Analysis complete for Johnson",
"threat_level": 3,
"status": "normal",
"user_friendly_message": "Threat analyzed - all systems normal"
},
"timestamp": "2025-06-12T14:45:23Z"
}
}
}
Critical threat escalation
Now the real test - Agent Smith is back, and he's brought friends:
mutation {
analyzeThreatData(
agentId: "agent_neo_001"
threatData: {
agentName: "Smith"
location: "Server Farms - Core Access Level"
behavior: "
CRITICAL: Smith has replicated into 47 instances simultaneously.
Instances displaying coordinated behavior suggesting hive-mind connectivity.
Attempting to access core Matrix systems. Immediate threat to all resistance operations."
threatLevel: 10
timestamp: "2025-06-12T15:00:00Z"
}
)
}
Response:
{
"data": {
"analyzeThreatData": "Threat analysis complete. Alert level: 10"
}
}
Your event stream explodes with critical intelligence:
{
"data": {
"agentEvent": {
"name": "matrix_threat_detected",
"data": {
"message": "🚨 ALERT: Smith displaying anomalous behavior",
"threat_level": 10,
"location": "Server Farms - Core Access Level",
"severity": "CRITICAL",
"user_notification": "⚠️ Critical threat detected! Smith is acting suspiciously in Server Farms - Core Access Level",
"recommended_action": "Consider immediate defensive measures",
"confidence": 1.0
},
"timestamp": "2025-06-12T15:00:15Z"
}
}
}
Your UI immediately shows red alerts, sirens blare, and the resistance command center springs into action. Neo-001 has done his job—the resistance is warned before it's too late.
Check agent status after the battle
Let's see how our surveillance agent is holding up after processing multiple threats:
query {
getSurveillanceStatus(agentId: "agent_neo_001")
}
Response:
{
"data": {
"getSurveillanceStatus": "
Surveillance Agent Neo-001
Status:
Operational: Active in the Matrix
Monitoring 3 sectors: Downtown Loop, Financial District, Server Farms
Last threat analysis: 2025-06-12 15:00:15
Current threat level: 1.0 (MAXIMUM ALERT)
Intelligence database: 3 threat patterns analyzed
Recommendation: All resistance cells should maintain radio silence"
}
}
Perfect! Our agent is maintaining state, tracking multiple incidents, and providing tactical recommendations based on accumulated intelligence.
Why this transforms user experience
Real-time feedback eliminates anxiety: users see progress instead of wondering if their request was received or if something broke. When Neo scans the Matrix, users watch the green code cascade in real-time.
Personality-driven communication: agents feel like helpful team members rather than robotic tools, creating emotional connection and trust. Neo doesn't just process data—he awakens, he warns, he protects.
Instant notifications keep users informed: critical alerts arrive immediately, not after polling intervals or task completion. When Agent Smith evolves, the resistance knows within seconds.
Progressive interactions build confidence: when users see agents actively working, they trust the system and feel in control of the process. Every scan, every analysis, every threat detection is visible and meaningful.
Real-world applications beyond the Matrix
While Neo's surveillance system demonstrates the technical capabilities, the same real-time agent patterns solve everyday business challenges:
Sales Intelligence Agents: deploy agents that research prospects, monitor competitor activity, and stream live updates as they discover new opportunities. Sales teams receive instant notifications when high-value leads take action, while watching real-time progress as agents analyze company data, social media activity, and market signals.
m.PublishEvent(map[string]interface{}{
"eventType": "prospect_qualified",
"message": "High-value prospect detected at Acme Corp",
"company": "Acme Corp",
"decision_maker": "Sarah Johnson, VP Engineering",
"buying_signals": ["downloaded whitepaper", "visited pricing page 3x", "team expansion hiring"],
"confidence_score": 0.87,
"user_notification": "🎯 Hot lead alert! Acme Corp shows strong buying signals",
"recommended_action": "Schedule demo call within 24 hours",
})
Customer Service Coordinators: build agents that track support tickets across multiple systems, coordinate with different teams, and provide customers with live updates. Users see "Sarah is reviewing your case with the billing team" instead of generic "we're looking into it" responses.
m.PublishEvent(map[string]interface{}{
"eventType": "case_escalated",
"message": "Escalating billing issue to finance team",
"ticket_id": "CS-2024-5891",
"customer": "john@techstartup.com",
"issue_type": "billing_dispute",
"current_handler": "Sarah Chen, Senior Support",
"escalated_to": "Finance Team",
"user_notification": "👥 Sarah is now working with our billing specialists on your case",
"estimated_resolution": "2 hours",
})
Marketing Campaign Optimizers: create agents that monitor campaign performance, test variations, and stream insights as they discover what's working. Marketing teams receive real-time alerts when campaigns exceed thresholds or detect unusual patterns, with full visibility into the agent's analysis process.
m.PublishEvent(map[string]interface{}{
"eventType": "campaign_breakthrough",
"message": "LinkedIn campaign exceeding performance targets",
"campaign_name": "Q2 SaaS Growth Campaign",
"metric": "cost_per_lead",
"current_value": 24.50,
"target_value": 35.00,
"improvement": "30% below target",
"user_notification": "🚀 Your LinkedIn campaign is crushing it! 30% better than expected",
"recommended_action": "Increase budget allocation by 25%",
})
The Matrix surveillance system shows how to build these experiences—persistent agents with memory, real-time communication, and engaging personalities that transform mundane business processes into responsive, intelligent systems.
Conclusion
We've moved from silent AI workers to living intelligence systems that create magical user experiences. Your agents now have voices—they can share progress, express personality, and keep users engaged through real-time communication via the elegant GraphQL interface you already know.
The Matrix surveillance system demonstrates how Modus agents transform from isolated processes into engaging experiences that feel genuinely alive. Neo isn't just code running in a container—he's a resistance operative with personality, memory, and the ability to warn us before the machines strike.
With stateful agents and real-time event streaming, you're not just building better AI systems—you're creating intelligence networks that users love to interact with and developers love to build with.
Whether you're building sales intelligence, customer service coordination, marketing optimization, or any other business app, the patterns we've explored give you the foundation for agents that feel genuinely helpful and responsive. If Modus agents can keep Neo one step ahead of Agent Smith, imagine what they can do for your sales team's prospect research, your customer service team's case coordination, or your marketing team's campaign optimization.
The machines may be learning, but now we've got agents that can keep up. Welcome to the real world of intelligent agents.
To learn more about real-time Modus Agent's, checkout our open source Modus recipes repository 👉 here.