Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions internal/db/sqlite.go
Original file line number Diff line number Diff line change
Expand Up @@ -242,6 +242,10 @@ func (db *DB) migrate() error {
`ALTER TABLE tasks ADD COLUMN executor TEXT DEFAULT 'claude'`, // Task executor: "claude" (default), "codex"
// Tmux window ID for unique window identification (avoids duplicate window issues)
`ALTER TABLE tasks ADD COLUMN tmux_window_id TEXT DEFAULT ''`, // tmux window ID (e.g., "@1234")
// Distilled task summary for search indexing and context
`ALTER TABLE tasks ADD COLUMN summary TEXT DEFAULT ''`, // Distilled summary of what was accomplished
// Last distillation timestamp for tracking when to re-distill
`ALTER TABLE tasks ADD COLUMN last_distilled_at DATETIME`, // When task was last distilled
// Tmux pane IDs for deterministic pane identification (avoids index-based guessing)
`ALTER TABLE tasks ADD COLUMN claude_pane_id TEXT DEFAULT ''`, // tmux pane ID for Claude/executor pane (e.g., "%1234")
`ALTER TABLE tasks ADD COLUMN shell_pane_id TEXT DEFAULT ''`, // tmux pane ID for shell pane (e.g., "%1235")
Expand Down
114 changes: 78 additions & 36 deletions internal/db/tasks.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ type Task struct {
DangerousMode bool // Whether task is running in dangerous mode (--dangerously-skip-permissions)
Pinned bool // Whether the task is pinned to the top of its column
Tags string // Comma-separated tags for categorization (e.g., "customer-support,email,influence-kit")
Summary string // Distilled summary of what was accomplished (for search and context)
CreatedAt LocalTime
UpdatedAt LocalTime
StartedAt *LocalTime
Expand All @@ -39,6 +40,8 @@ type Task struct {
ScheduledAt *LocalTime // When to next run (nil = not scheduled)
Recurrence string // Deprecated: no longer used (kept for backward compatibility)
LastRunAt *LocalTime // When last executed (for scheduled tasks)
// Distillation tracking
LastDistilledAt *LocalTime // When task was last distilled for learnings
}

// Task statuses
Expand Down Expand Up @@ -162,18 +165,18 @@ func (db *DB) GetTask(id int64) (*Task, error) {
COALESCE(daemon_session, ''), COALESCE(tmux_window_id, ''),
COALESCE(claude_pane_id, ''), COALESCE(shell_pane_id, ''),
COALESCE(pr_url, ''), COALESCE(pr_number, 0),
COALESCE(dangerous_mode, 0), COALESCE(pinned, 0), COALESCE(tags, ''),
COALESCE(dangerous_mode, 0), COALESCE(pinned, 0), COALESCE(tags, ''), COALESCE(summary, ''),
created_at, updated_at, started_at, completed_at,
scheduled_at, recurrence, last_run_at
scheduled_at, recurrence, last_run_at, last_distilled_at
FROM tasks WHERE id = ?
`, id).Scan(
&t.ID, &t.Title, &t.Body, &t.Status, &t.Type, &t.Project, &t.Executor,
&t.WorktreePath, &t.BranchName, &t.Port, &t.ClaudeSessionID,
&t.DaemonSession, &t.TmuxWindowID, &t.ClaudePaneID, &t.ShellPaneID,
&t.PRURL, &t.PRNumber,
&t.DangerousMode, &t.Pinned, &t.Tags,
&t.DangerousMode, &t.Pinned, &t.Tags, &t.Summary,
&t.CreatedAt, &t.UpdatedAt, &t.StartedAt, &t.CompletedAt,
&t.ScheduledAt, &t.Recurrence, &t.LastRunAt,
&t.ScheduledAt, &t.Recurrence, &t.LastRunAt, &t.LastDistilledAt,
)
if err == sql.ErrNoRows {
return nil, nil
Expand Down Expand Up @@ -202,9 +205,9 @@ func (db *DB) ListTasks(opts ListTasksOptions) ([]*Task, error) {
COALESCE(daemon_session, ''), COALESCE(tmux_window_id, ''),
COALESCE(claude_pane_id, ''), COALESCE(shell_pane_id, ''),
COALESCE(pr_url, ''), COALESCE(pr_number, 0),
COALESCE(dangerous_mode, 0), COALESCE(pinned, 0), COALESCE(tags, ''),
COALESCE(dangerous_mode, 0), COALESCE(pinned, 0), COALESCE(tags, ''), COALESCE(summary, ''),
created_at, updated_at, started_at, completed_at,
scheduled_at, recurrence, last_run_at
scheduled_at, recurrence, last_run_at, last_distilled_at
FROM tasks WHERE 1=1
`
args := []interface{}{}
Expand Down Expand Up @@ -254,9 +257,9 @@ func (db *DB) ListTasks(opts ListTasksOptions) ([]*Task, error) {
&t.WorktreePath, &t.BranchName, &t.Port, &t.ClaudeSessionID,
&t.DaemonSession, &t.TmuxWindowID, &t.ClaudePaneID, &t.ShellPaneID,
&t.PRURL, &t.PRNumber,
&t.DangerousMode, &t.Pinned, &t.Tags,
&t.DangerousMode, &t.Pinned, &t.Tags, &t.Summary,
&t.CreatedAt, &t.UpdatedAt, &t.StartedAt, &t.CompletedAt,
&t.ScheduledAt, &t.Recurrence, &t.LastRunAt,
&t.ScheduledAt, &t.Recurrence, &t.LastRunAt, &t.LastDistilledAt,
)
if err != nil {
return nil, fmt.Errorf("scan task: %w", err)
Expand All @@ -277,9 +280,9 @@ func (db *DB) GetMostRecentlyCreatedTask() (*Task, error) {
COALESCE(daemon_session, ''), COALESCE(tmux_window_id, ''),
COALESCE(claude_pane_id, ''), COALESCE(shell_pane_id, ''),
COALESCE(pr_url, ''), COALESCE(pr_number, 0),
COALESCE(dangerous_mode, 0), COALESCE(pinned, 0), COALESCE(tags, ''),
COALESCE(dangerous_mode, 0), COALESCE(pinned, 0), COALESCE(tags, ''), COALESCE(summary, ''),
created_at, updated_at, started_at, completed_at,
scheduled_at, recurrence, last_run_at
scheduled_at, recurrence, last_run_at, last_distilled_at
FROM tasks
ORDER BY created_at DESC, id DESC
LIMIT 1
Expand All @@ -288,9 +291,9 @@ func (db *DB) GetMostRecentlyCreatedTask() (*Task, error) {
&t.WorktreePath, &t.BranchName, &t.Port, &t.ClaudeSessionID,
&t.DaemonSession, &t.TmuxWindowID, &t.ClaudePaneID, &t.ShellPaneID,
&t.PRURL, &t.PRNumber,
&t.DangerousMode, &t.Pinned, &t.Tags,
&t.DangerousMode, &t.Pinned, &t.Tags, &t.Summary,
&t.CreatedAt, &t.UpdatedAt, &t.StartedAt, &t.CompletedAt,
&t.ScheduledAt, &t.Recurrence, &t.LastRunAt,
&t.ScheduledAt, &t.Recurrence, &t.LastRunAt, &t.LastDistilledAt,
)
if err == sql.ErrNoRows {
return nil, nil
Expand All @@ -315,9 +318,9 @@ func (db *DB) SearchTasks(query string, limit int) ([]*Task, error) {
COALESCE(daemon_session, ''), COALESCE(tmux_window_id, ''),
COALESCE(claude_pane_id, ''), COALESCE(shell_pane_id, ''),
COALESCE(pr_url, ''), COALESCE(pr_number, 0),
COALESCE(dangerous_mode, 0), COALESCE(pinned, 0), COALESCE(tags, ''),
COALESCE(dangerous_mode, 0), COALESCE(pinned, 0), COALESCE(tags, ''), COALESCE(summary, ''),
created_at, updated_at, started_at, completed_at,
scheduled_at, recurrence, last_run_at
scheduled_at, recurrence, last_run_at, last_distilled_at
FROM tasks
WHERE (
title LIKE ? COLLATE NOCASE
Expand Down Expand Up @@ -345,9 +348,9 @@ func (db *DB) SearchTasks(query string, limit int) ([]*Task, error) {
&t.WorktreePath, &t.BranchName, &t.Port, &t.ClaudeSessionID,
&t.DaemonSession, &t.TmuxWindowID, &t.ClaudePaneID, &t.ShellPaneID,
&t.PRURL, &t.PRNumber,
&t.DangerousMode, &t.Pinned, &t.Tags,
&t.DangerousMode, &t.Pinned, &t.Tags, &t.Summary,
&t.CreatedAt, &t.UpdatedAt, &t.StartedAt, &t.CompletedAt,
&t.ScheduledAt, &t.Recurrence, &t.LastRunAt,
&t.ScheduledAt, &t.Recurrence, &t.LastRunAt, &t.LastDistilledAt,
)
if err != nil {
return nil, fmt.Errorf("scan task: %w", err)
Expand Down Expand Up @@ -443,6 +446,19 @@ func (db *DB) UpdateTaskDangerousMode(taskID int64, dangerousMode bool) error {
return nil
}

// SaveTaskSummary updates the distilled summary for a task.
// This is called after task completion to store a concise summary for search and context.
func (db *DB) SaveTaskSummary(taskID int64, summary string) error {
_, err := db.Exec(`
UPDATE tasks SET summary = ?, updated_at = CURRENT_TIMESTAMP
WHERE id = ?
`, summary, taskID)
if err != nil {
return fmt.Errorf("save task summary: %w", err)
}
return nil
}

// UpdateTaskPinned updates only the pinned flag for a task.
func (db *DB) UpdateTaskPinned(taskID int64, pinned bool) error {
_, err := db.Exec(`
Expand Down Expand Up @@ -574,9 +590,9 @@ func (db *DB) GetNextQueuedTask() (*Task, error) {
COALESCE(daemon_session, ''), COALESCE(tmux_window_id, ''),
COALESCE(claude_pane_id, ''), COALESCE(shell_pane_id, ''),
COALESCE(pr_url, ''), COALESCE(pr_number, 0),
COALESCE(dangerous_mode, 0), COALESCE(pinned, 0), COALESCE(tags, ''),
COALESCE(dangerous_mode, 0), COALESCE(pinned, 0), COALESCE(tags, ''), COALESCE(summary, ''),
created_at, updated_at, started_at, completed_at,
scheduled_at, recurrence, last_run_at
scheduled_at, recurrence, last_run_at, last_distilled_at
FROM tasks
WHERE status = ?
ORDER BY created_at ASC
Expand All @@ -586,9 +602,9 @@ func (db *DB) GetNextQueuedTask() (*Task, error) {
&t.WorktreePath, &t.BranchName, &t.Port, &t.ClaudeSessionID,
&t.DaemonSession, &t.TmuxWindowID, &t.ClaudePaneID, &t.ShellPaneID,
&t.PRURL, &t.PRNumber,
&t.DangerousMode, &t.Pinned, &t.Tags,
&t.DangerousMode, &t.Pinned, &t.Tags, &t.Summary,
&t.CreatedAt, &t.UpdatedAt, &t.StartedAt, &t.CompletedAt,
&t.ScheduledAt, &t.Recurrence, &t.LastRunAt,
&t.ScheduledAt, &t.Recurrence, &t.LastRunAt, &t.LastDistilledAt,
)
if err == sql.ErrNoRows {
return nil, nil
Expand All @@ -607,9 +623,9 @@ func (db *DB) GetQueuedTasks() ([]*Task, error) {
COALESCE(daemon_session, ''), COALESCE(tmux_window_id, ''),
COALESCE(claude_pane_id, ''), COALESCE(shell_pane_id, ''),
COALESCE(pr_url, ''), COALESCE(pr_number, 0),
COALESCE(dangerous_mode, 0), COALESCE(pinned, 0), COALESCE(tags, ''),
COALESCE(dangerous_mode, 0), COALESCE(pinned, 0), COALESCE(tags, ''), COALESCE(summary, ''),
created_at, updated_at, started_at, completed_at,
scheduled_at, recurrence, last_run_at
scheduled_at, recurrence, last_run_at, last_distilled_at
FROM tasks
WHERE status = ?
ORDER BY created_at ASC
Expand All @@ -627,9 +643,9 @@ func (db *DB) GetQueuedTasks() ([]*Task, error) {
&t.WorktreePath, &t.BranchName, &t.Port, &t.ClaudeSessionID,
&t.DaemonSession, &t.TmuxWindowID, &t.ClaudePaneID, &t.ShellPaneID,
&t.PRURL, &t.PRNumber,
&t.DangerousMode, &t.Pinned, &t.Tags,
&t.DangerousMode, &t.Pinned, &t.Tags, &t.Summary,
&t.CreatedAt, &t.UpdatedAt, &t.StartedAt, &t.CompletedAt,
&t.ScheduledAt, &t.Recurrence, &t.LastRunAt,
&t.ScheduledAt, &t.Recurrence, &t.LastRunAt, &t.LastDistilledAt,
); err != nil {
return nil, fmt.Errorf("scan task: %w", err)
}
Expand All @@ -647,9 +663,9 @@ func (db *DB) GetTasksWithBranches() ([]*Task, error) {
COALESCE(daemon_session, ''), COALESCE(tmux_window_id, ''),
COALESCE(claude_pane_id, ''), COALESCE(shell_pane_id, ''),
COALESCE(pr_url, ''), COALESCE(pr_number, 0),
COALESCE(dangerous_mode, 0), COALESCE(pinned, 0), COALESCE(tags, ''),
COALESCE(dangerous_mode, 0), COALESCE(pinned, 0), COALESCE(tags, ''), COALESCE(summary, ''),
created_at, updated_at, started_at, completed_at,
scheduled_at, recurrence, last_run_at
scheduled_at, recurrence, last_run_at, last_distilled_at
FROM tasks
WHERE branch_name != '' AND status NOT IN (?, ?)
ORDER BY created_at DESC
Expand All @@ -667,9 +683,9 @@ func (db *DB) GetTasksWithBranches() ([]*Task, error) {
&t.WorktreePath, &t.BranchName, &t.Port, &t.ClaudeSessionID,
&t.DaemonSession, &t.TmuxWindowID, &t.ClaudePaneID, &t.ShellPaneID,
&t.PRURL, &t.PRNumber,
&t.DangerousMode, &t.Pinned, &t.Tags,
&t.DangerousMode, &t.Pinned, &t.Tags, &t.Summary,
&t.CreatedAt, &t.UpdatedAt, &t.StartedAt, &t.CompletedAt,
&t.ScheduledAt, &t.Recurrence, &t.LastRunAt,
&t.ScheduledAt, &t.Recurrence, &t.LastRunAt, &t.LastDistilledAt,
); err != nil {
return nil, fmt.Errorf("scan task: %w", err)
}
Expand All @@ -689,9 +705,9 @@ func (db *DB) GetDueScheduledTasks() ([]*Task, error) {
COALESCE(daemon_session, ''), COALESCE(tmux_window_id, ''),
COALESCE(claude_pane_id, ''), COALESCE(shell_pane_id, ''),
COALESCE(pr_url, ''), COALESCE(pr_number, 0),
COALESCE(dangerous_mode, 0), COALESCE(pinned, 0), COALESCE(tags, ''),
COALESCE(dangerous_mode, 0), COALESCE(pinned, 0), COALESCE(tags, ''), COALESCE(summary, ''),
created_at, updated_at, started_at, completed_at,
scheduled_at, recurrence, last_run_at
scheduled_at, recurrence, last_run_at, last_distilled_at
FROM tasks
WHERE scheduled_at IS NOT NULL
AND scheduled_at <= CURRENT_TIMESTAMP
Expand All @@ -711,9 +727,9 @@ func (db *DB) GetDueScheduledTasks() ([]*Task, error) {
&t.WorktreePath, &t.BranchName, &t.Port, &t.ClaudeSessionID,
&t.DaemonSession, &t.TmuxWindowID, &t.ClaudePaneID, &t.ShellPaneID,
&t.PRURL, &t.PRNumber,
&t.DangerousMode, &t.Pinned, &t.Tags,
&t.DangerousMode, &t.Pinned, &t.Tags, &t.Summary,
&t.CreatedAt, &t.UpdatedAt, &t.StartedAt, &t.CompletedAt,
&t.ScheduledAt, &t.Recurrence, &t.LastRunAt,
&t.ScheduledAt, &t.Recurrence, &t.LastRunAt, &t.LastDistilledAt,
); err != nil {
return nil, fmt.Errorf("scan task: %w", err)
}
Expand All @@ -730,9 +746,9 @@ func (db *DB) GetScheduledTasks() ([]*Task, error) {
COALESCE(daemon_session, ''), COALESCE(tmux_window_id, ''),
COALESCE(claude_pane_id, ''), COALESCE(shell_pane_id, ''),
COALESCE(pr_url, ''), COALESCE(pr_number, 0),
COALESCE(dangerous_mode, 0), COALESCE(pinned, 0), COALESCE(tags, ''),
COALESCE(dangerous_mode, 0), COALESCE(pinned, 0), COALESCE(tags, ''), COALESCE(summary, ''),
created_at, updated_at, started_at, completed_at,
scheduled_at, recurrence, last_run_at
scheduled_at, recurrence, last_run_at, last_distilled_at
FROM tasks
WHERE scheduled_at IS NOT NULL
ORDER BY scheduled_at ASC
Expand All @@ -750,9 +766,9 @@ func (db *DB) GetScheduledTasks() ([]*Task, error) {
&t.WorktreePath, &t.BranchName, &t.Port, &t.ClaudeSessionID,
&t.DaemonSession, &t.TmuxWindowID, &t.ClaudePaneID, &t.ShellPaneID,
&t.PRURL, &t.PRNumber,
&t.DangerousMode, &t.Pinned, &t.Tags,
&t.DangerousMode, &t.Pinned, &t.Tags, &t.Summary,
&t.CreatedAt, &t.UpdatedAt, &t.StartedAt, &t.CompletedAt,
&t.ScheduledAt, &t.Recurrence, &t.LastRunAt,
&t.ScheduledAt, &t.Recurrence, &t.LastRunAt, &t.LastDistilledAt,
); err != nil {
return nil, fmt.Errorf("scan task: %w", err)
}
Expand Down Expand Up @@ -1772,6 +1788,32 @@ func (db *DB) FindSimilarTasks(task *Task, limit int) ([]*TaskSearchResult, erro
})
}

// UpdateTaskLastDistilledAt updates the last_distilled_at timestamp for a task.
// This is called after distilling learnings from a task to track when it was last processed.
func (db *DB) UpdateTaskLastDistilledAt(taskID int64, t time.Time) error {
_, err := db.Exec(`
UPDATE tasks SET last_distilled_at = ?, updated_at = CURRENT_TIMESTAMP
WHERE id = ?
`, LocalTime{Time: t}, taskID)
if err != nil {
return fmt.Errorf("update task last_distilled_at: %w", err)
}
return nil
}

// UpdateTaskStartedAt updates the started_at timestamp for a task.
// This is primarily used for testing.
func (db *DB) UpdateTaskStartedAt(taskID int64, t time.Time) error {
_, err := db.Exec(`
UPDATE tasks SET started_at = ?, updated_at = CURRENT_TIMESTAMP
WHERE id = ?
`, LocalTime{Time: t}, taskID)
if err != nil {
return fmt.Errorf("update task started_at: %w", err)
}
return nil
}

// GetTagsList returns all unique tags used across all tasks.
func (db *DB) GetTagsList() ([]string, error) {
rows, err := db.Query(`SELECT DISTINCT tags FROM tasks WHERE tags != ''`)
Expand Down
40 changes: 7 additions & 33 deletions internal/executor/executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -882,6 +882,10 @@ func (e *Executor) executeTask(ctx context.Context, task *db.Task) {
result = execResult.toInternal()
}

// Check if we should distill learnings from this execution session
// This runs asynchronously and captures memories even for in-progress tasks
e.MaybeDistillTask(task)

// Check current status - hooks may have already set it
currentTask, _ := e.db.GetTask(task.ID)
currentStatus := ""
Expand Down Expand Up @@ -919,17 +923,14 @@ func (e *Executor) executeTask(ctx context.Context, task *db.Task) {
// Save transcript on completion
e.saveTranscriptOnCompletion(task.ID, workDir)

// Index task for future search/retrieval
e.indexTaskForSearch(task)

// NOTE: We intentionally do NOT kill the executor here - keep it running so user can
// easily retry/resume the task. Old done task executors are cleaned up after 2h
// by the cleanupOrphanedClaudes routine.

// Extract memories from successful task
// Distill and index the completed task (runs in background)
go func() {
if err := e.ExtractMemories(context.Background(), task); err != nil {
e.logger.Error("Memory extraction failed", "task", task.ID, "error", err)
if err := e.processCompletedTask(context.Background(), task); err != nil {
e.logger.Error("Task processing failed", "task", task.ID, "error", err)
}
}()
} else if result.NeedsInput {
Expand Down Expand Up @@ -2791,33 +2792,6 @@ func (e *Executor) getProjectMemoriesSection(project string) string {
return sb.String()
}

// indexTaskForSearch indexes a completed task in the FTS5 search table.
// This enables future tasks to find and reference similar past work.
func (e *Executor) indexTaskForSearch(task *db.Task) {
// Get transcript excerpt (first ~2000 chars of the most recent transcript)
var transcriptExcerpt string
summary, err := e.db.GetLatestCompactionSummary(task.ID)
if err == nil && summary != nil && len(summary.Summary) > 0 {
transcriptExcerpt = summary.Summary
if len(transcriptExcerpt) > 2000 {
transcriptExcerpt = transcriptExcerpt[:2000]
}
}

// Index the task
if err := e.db.IndexTaskForSearch(
task.ID,
task.Project,
task.Title,
task.Body,
task.Tags,
transcriptExcerpt,
); err != nil {
e.logger.Debug("Failed to index task for search", "task", task.ID, "error", err)
} else {
e.logger.Debug("Indexed task for search", "task", task.ID)
}
}

// getSimilarTasksSection checks if similar past tasks exist and returns a hint.
// Instead of injecting full content, we just notify Claude that the search tools are available.
Expand Down
Loading