Daemon Architecture
The daemon (owld) is OWL's core service, handling all AI logic and persistent state.
Overview
owl/daemon/
├── main.py # Entry point, initialization
├── server.py # Unix socket server, request handling
├── protocol.py # Communication protocol definitions
└── project.py # Project detection
Server Lifecycle
Startup
# main.py
def main():
config = get_config()
# Check Ollama connectivity
llm = LLMClient()
if not llm.health_check():
print("Cannot connect to Ollama")
sys.exit(1)
# Start server
server = DaemonServer()
server.start()
Server Initialization
# server.py
class DaemonServer:
def __init__(self):
# Core components
self.llm = LLMClient()
self.soul = get_soul()
self.memory = get_memory_store()
self.knowledge = get_knowledge_store()
self.tool_registry = get_tool_registry()
# State
self.session_id = self._get_or_create_session()
self.current_project = None
Request Handling
def _handle_client(self, client: socket.socket):
# Read request
data = self._recv_message(client)
request = Request.from_json(data)
# Route to handler
if request.type == RequestType.STREAM:
self._handle_stream(request, client)
else:
handler = self.handlers[request.type]
response = handler(request)
self._send_message(client, response.to_json())
Request Types
| Type | Handler | Description |
|---|---|---|
CHAT | _handle_chat | Non-streaming chat |
STREAM | _handle_stream | Streaming chat |
ABORT | _handle_abort | Cancel current request |
PROJECT | _handle_project | Set/get project |
SESSION | _handle_session | Session management |
TOOLS | _handle_tools | Tool profiles |
MEMORY | _handle_memory | Memory operations |
KNOWLEDGE | _handle_knowledge | Knowledge base |
SOUL | _handle_soul | View/modify soul |
EVOLVE | _handle_evolve | Trigger evolution |
STATUS | _handle_status | Daemon status |
SHUTDOWN | _handle_shutdown | Stop daemon |
Streaming Handler
The streaming handler is the most complex, handling tool loops:
def _handle_stream(self, request: Request, client: socket.socket):
message = request.payload.get("message")
# Build context
builder = ContextBuilder()
builder.with_project(self.current_project)
builder.with_knowledge_query(message)
# Get history
history = self.memory.get_conversation(self.session_id)
messages = builder.build(message, history)
# Tool loop
while True:
# Check abort/timeout
if self._should_stop():
break
# Call LLM
response = self.llm.chat(messages, tools=tools)
# No tool calls - stream response
if not response.tool_calls:
for chunk in self.llm.chat_stream(messages):
self._send_chunk(client, StreamChunk(content=chunk))
break
# Execute tools
for tool_call in response.tool_calls:
# Send tool_call notification
self._send_chunk(client, StreamChunk(
tool_call={"name": tool_call.name, "args": tool_call.arguments}
))
# Execute
result = self.tool_registry.execute(tool_call.name, tool_call.arguments)
# Send tool_result notification
self._send_chunk(client, StreamChunk(
tool_result={"name": tool_call.name, "success": "error" not in result}
))
# Add to context
messages.append(Message(role="tool", content=json.dumps(result)))
# Store conversation
self.memory.add_message(self.session_id, "user", message)
self.memory.add_message(self.session_id, "assistant", final_content)
# Async tasks
self.reflector.reflect_async(self.session_id)
self.summarizer.summarize_async(self.session_id)
Run State Management
class RunState(Enum):
QUEUED = "queued"
RUNNING = "running"
STREAMING = "streaming"
DONE = "done"
ABORTED = "aborted"
TIMEOUT = "timeout"
ERROR = "error"
The server tracks run state for abort/timeout handling:
def _start_run(self) -> str:
self._run_lock.acquire()
self._current_run_id = str(uuid.uuid4())[:8]
self._run_state = RunState.RUNNING
self._abort_requested.clear()
self._run_start_time = time.time()
return self._current_run_id
def _check_abort(self) -> bool:
return self._abort_requested.is_set()
def _check_timeout(self) -> bool:
elapsed = time.time() - self._run_start_time
return elapsed > self._timeout
Protocol
Message Format
All messages are length-prefixed JSON:
[4 bytes: length][JSON payload]
Request
@dataclass
class Request:
type: RequestType
payload: Dict[str, Any]
Response
@dataclass
class Response:
success: bool
data: Dict[str, Any]
error: Optional[str] = None
stream: bool = False
Stream Chunk
@dataclass
class StreamChunk:
content: str = "" # Text content
tool_call: Optional[Dict] # Tool being called
tool_result: Optional[Dict] # Tool completed
stage: Optional[str] # Current stage
ask_user: Optional[Dict] # Question for user
done: bool = False # Stream complete
error: Optional[str] # Error message
aborted: bool = False # Was aborted
Project Detection
# project.py
def detect_project(path: str) -> ProjectContext:
"""Detect project type and frameworks."""
# Check for project markers
markers = {
"pyproject.toml": ("python", None),
"setup.py": ("python", None),
"package.json": ("javascript", None),
"Cargo.toml": ("rust", None),
"go.mod": ("go", None),
"pom.xml": ("java", "maven"),
"build.gradle": ("java", "gradle"),
}
# Detect frameworks
frameworks = []
if (path / "manage.py").exists():
frameworks.append("django")
if (path / "app.py").exists():
frameworks.append("flask")
# ... more detection
return ProjectContext(
path=str(path),
type=project_type,
frameworks=frameworks,
key_dirs=find_key_directories(path)
)
Concurrency
Thread Safety
_run_lockserializes LLM requests- One active run at a time
- Multiple CLI connections allowed
Async Operations
Background tasks don't block responses:
# Reflection runs in background thread
self.reflector.reflect_async(session_id)
# Summarization runs in background
self.summarizer.summarize_async(session_id)
Error Handling
def _handle_client(self, client: socket.socket):
try:
# Handle request
...
except Exception as e:
error_response = Response.err(str(e))
self._send_message(client, error_response.to_json())
finally:
client.close()
For streaming:
try:
# Stream response
...
except Exception as e:
self._send_chunk(client, StreamChunk(error=str(e), done=True))
self._end_run(RunState.ERROR)