diff --git a/apps/sim/app/api/copilot/chat/delete/route.test.ts b/apps/sim/app/api/copilot/chat/delete/route.test.ts index 0493b3ffe89..0115e793ab2 100644 --- a/apps/sim/app/api/copilot/chat/delete/route.test.ts +++ b/apps/sim/app/api/copilot/chat/delete/route.test.ts @@ -41,7 +41,7 @@ vi.mock('@/lib/copilot/chat/lifecycle', () => ({ })) vi.mock('@/lib/copilot/tasks', () => ({ - taskPubSub: { publishStatusChanged: vi.fn() }, + taskPubSub: { publishTaskListChanged: vi.fn() }, })) import { DELETE } from './route' diff --git a/apps/sim/app/api/copilot/chat/delete/route.ts b/apps/sim/app/api/copilot/chat/delete/route.ts index 1742d9e7e55..d316380c17d 100644 --- a/apps/sim/app/api/copilot/chat/delete/route.ts +++ b/apps/sim/app/api/copilot/chat/delete/route.ts @@ -41,10 +41,8 @@ export async function DELETE(request: NextRequest) { logger.info('Chat deleted', { chatId: parsed.chatId }) if (deleted.workspaceId) { - taskPubSub?.publishStatusChanged({ + taskPubSub?.publishTaskListChanged({ workspaceId: deleted.workspaceId, - chatId: parsed.chatId, - type: 'deleted', }) } diff --git a/apps/sim/app/api/copilot/chat/rename/route.ts b/apps/sim/app/api/copilot/chat/rename/route.ts index 7587f577411..d43a82783fe 100644 --- a/apps/sim/app/api/copilot/chat/rename/route.ts +++ b/apps/sim/app/api/copilot/chat/rename/route.ts @@ -44,10 +44,8 @@ export async function PATCH(request: NextRequest) { logger.info('Chat renamed', { chatId, title }) if (updated.workspaceId) { - taskPubSub?.publishStatusChanged({ + taskPubSub?.publishTaskListChanged({ workspaceId: updated.workspaceId, - chatId, - type: 'renamed', }) } diff --git a/apps/sim/app/api/copilot/chat/stop/route.ts b/apps/sim/app/api/copilot/chat/stop/route.ts index 8a742d7080e..6fbf67c8896 100644 --- a/apps/sim/app/api/copilot/chat/stop/route.ts +++ b/apps/sim/app/api/copilot/chat/stop/route.ts @@ -126,10 +126,8 @@ export async function POST(req: NextRequest) { .returning({ workspaceId: copilotChats.workspaceId }) if (updated?.workspaceId) { - taskPubSub?.publishStatusChanged({ + taskPubSub?.publishTaskListChanged({ workspaceId: updated.workspaceId, - chatId, - type: 'completed', }) } diff --git a/apps/sim/app/api/copilot/chats/route.ts b/apps/sim/app/api/copilot/chats/route.ts index b0142c27f7b..3106867ca5e 100644 --- a/apps/sim/app/api/copilot/chats/route.ts +++ b/apps/sim/app/api/copilot/chats/route.ts @@ -128,7 +128,7 @@ export async function POST(request: NextRequest) { return createInternalServerErrorResponse('Failed to create chat') } - taskPubSub?.publishStatusChanged({ workspaceId, chatId: result.chatId, type: 'created' }) + taskPubSub?.publishTaskListChanged({ workspaceId }) return NextResponse.json({ success: true, id: result.chatId }) } catch (error) { diff --git a/apps/sim/app/api/mothership/chats/[chatId]/route.ts b/apps/sim/app/api/mothership/chats/[chatId]/route.ts index e5fc73f3017..207e45987e0 100644 --- a/apps/sim/app/api/mothership/chats/[chatId]/route.ts +++ b/apps/sim/app/api/mothership/chats/[chatId]/route.ts @@ -165,10 +165,8 @@ export async function PATCH( if (updatedChat.workspaceId) { if (title !== undefined) { - taskPubSub?.publishStatusChanged({ + taskPubSub?.publishTaskListChanged({ workspaceId: updatedChat.workspaceId, - chatId, - type: 'renamed', }) captureServerEvent( userId, @@ -239,10 +237,8 @@ export async function DELETE( } if (deletedChat.workspaceId) { - taskPubSub?.publishStatusChanged({ + taskPubSub?.publishTaskListChanged({ workspaceId: deletedChat.workspaceId, - chatId, - type: 'deleted', }) captureServerEvent( userId, diff --git a/apps/sim/app/api/mothership/chats/route.ts b/apps/sim/app/api/mothership/chats/route.ts index 99bd6fd7390..93601bfb39e 100644 --- a/apps/sim/app/api/mothership/chats/route.ts +++ b/apps/sim/app/api/mothership/chats/route.ts @@ -94,7 +94,7 @@ export async function POST(request: NextRequest) { }) .returning({ id: copilotChats.id }) - taskPubSub?.publishStatusChanged({ workspaceId, chatId: chat.id, type: 'created' }) + taskPubSub?.publishTaskListChanged({ workspaceId }) captureServerEvent( userId, diff --git a/apps/sim/app/api/mothership/events/route.ts b/apps/sim/app/api/mothership/events/route.ts index 4f1646f6e34..52b3343edea 100644 --- a/apps/sim/app/api/mothership/events/route.ts +++ b/apps/sim/app/api/mothership/events/route.ts @@ -1,8 +1,8 @@ /** * SSE endpoint for task status events. * - * Pushes `task_status` events to the browser when tasks are - * started, completed, created, deleted, or renamed. + * Pushes `task_status` events to the browser when the workspace task list + * changes. * * Auth is handled via session cookies (EventSource sends cookies automatically). */ @@ -18,13 +18,9 @@ export const GET = createWorkspaceSSE({ { subscribe: (workspaceId, send) => { if (!taskPubSub) return () => {} - return taskPubSub.onStatusChanged((event) => { + return taskPubSub.onTaskListChanged((event) => { if (event.workspaceId !== workspaceId) return - send('task_status', { - chatId: event.chatId, - type: event.type, - timestamp: Date.now(), - }) + send('task_status', {}) }) }, }, diff --git a/apps/sim/app/workspace/[workspaceId]/home/hooks/use-chat.ts b/apps/sim/app/workspace/[workspaceId]/home/hooks/use-chat.ts index dea4eaacc12..a99e1ab8409 100644 --- a/apps/sim/app/workspace/[workspaceId]/home/hooks/use-chat.ts +++ b/apps/sim/app/workspace/[workspaceId]/home/hooks/use-chat.ts @@ -2622,16 +2622,30 @@ export function useChat( [] ) - const invalidateChatQueries = useCallback(() => { - const activeChatId = chatIdRef.current - if (activeChatId) { + const invalidateTaskHistory = useCallback( + (refetchType: 'active' | 'none' = 'active') => { + const activeChatId = chatIdRef.current + if (!activeChatId) { + return + } + queryClient.invalidateQueries({ queryKey: taskKeys.detail(activeChatId), + refetchType, }) - } + }, + [queryClient] + ) + + const invalidateTaskList = useCallback(() => { queryClient.invalidateQueries({ queryKey: taskKeys.list(workspaceId) }) }, [workspaceId, queryClient]) + const invalidateChatQueries = useCallback(() => { + invalidateTaskHistory() + invalidateTaskList() + }, [invalidateTaskHistory, invalidateTaskList]) + const messagesRef = useRef(messages) messagesRef.current = messages @@ -2643,7 +2657,8 @@ export function useChat( clearActiveTurn() setTransportIdle() abortControllerRef.current = null - invalidateChatQueries() + invalidateTaskHistory('none') + invalidateTaskList() if (!options?.error) { const cid = chatIdRef.current @@ -2660,7 +2675,13 @@ export function useChat( void enqueueQueueDispatchRef.current({ type: 'send_head' }) } }, - [clearActiveTurn, invalidateChatQueries, reconcileTerminalPreviewSessions, setTransportIdle] + [ + clearActiveTurn, + invalidateTaskHistory, + invalidateTaskList, + reconcileTerminalPreviewSessions, + setTransportIdle, + ] ) finalizeRef.current = finalize diff --git a/apps/sim/hooks/use-task-events.test.ts b/apps/sim/hooks/use-task-events.test.ts index ac58e6cf27d..cd2ce022551 100644 --- a/apps/sim/hooks/use-task-events.test.ts +++ b/apps/sim/hooks/use-task-events.test.ts @@ -16,44 +16,8 @@ describe('handleTaskStatusEvent', () => { vi.clearAllMocks() }) - it('invalidates the task list and completed chat detail', () => { - handleTaskStatusEvent( - queryClient, - JSON.stringify({ - chatId: 'chat-1', - type: 'completed', - timestamp: Date.now(), - }) - ) - - expect(queryClient.invalidateQueries).toHaveBeenCalledTimes(2) - expect(queryClient.invalidateQueries).toHaveBeenNthCalledWith(1, { - queryKey: taskKeys.lists(), - }) - expect(queryClient.invalidateQueries).toHaveBeenNthCalledWith(2, { - queryKey: taskKeys.detail('chat-1'), - }) - }) - - it('keeps list invalidation only for non-completed task events', () => { - handleTaskStatusEvent( - queryClient, - JSON.stringify({ - chatId: 'chat-1', - type: 'started', - timestamp: Date.now(), - }) - ) - - expect(queryClient.invalidateQueries).toHaveBeenCalledTimes(1) - expect(queryClient.invalidateQueries).toHaveBeenCalledWith({ - queryKey: taskKeys.lists(), - }) - }) - - it('preserves list invalidation when task event payload is invalid', () => { - handleTaskStatusEvent(queryClient, '{') - + it('invalidates the task list for task status updates', () => { + handleTaskStatusEvent(queryClient) expect(queryClient.invalidateQueries).toHaveBeenCalledTimes(1) expect(queryClient.invalidateQueries).toHaveBeenCalledWith({ queryKey: taskKeys.lists(), diff --git a/apps/sim/hooks/use-task-events.ts b/apps/sim/hooks/use-task-events.ts index 04bff3df49c..74ff3915226 100644 --- a/apps/sim/hooks/use-task-events.ts +++ b/apps/sim/hooks/use-task-events.ts @@ -6,55 +6,12 @@ import { taskKeys } from '@/hooks/queries/tasks' const logger = createLogger('TaskEvents') -interface TaskStatusEventPayload { - chatId?: string - type?: 'started' | 'completed' | 'created' | 'deleted' | 'renamed' -} - -function parseTaskStatusEventPayload(data: unknown): TaskStatusEventPayload | null { - let parsed = data - - if (typeof parsed === 'string') { - try { - parsed = JSON.parse(parsed) - } catch { - return null - } - } - - if (!parsed || typeof parsed !== 'object' || Array.isArray(parsed)) { - return null - } - - const record = parsed as Record - - return { - ...(typeof record.chatId === 'string' ? { chatId: record.chatId } : {}), - ...(typeof record.type === 'string' - ? { type: record.type as TaskStatusEventPayload['type'] } - : {}), - } -} - -export function handleTaskStatusEvent( - queryClient: Pick, - data: unknown -): void { +export function handleTaskStatusEvent(queryClient: Pick): void { queryClient.invalidateQueries({ queryKey: taskKeys.lists() }) - - const payload = parseTaskStatusEventPayload(data) - if (!payload) { - logger.warn('Received invalid task_status payload') - return - } - - if (payload.type === 'completed' && payload.chatId) { - queryClient.invalidateQueries({ queryKey: taskKeys.detail(payload.chatId) }) - } } /** - * Subscribes to task status SSE events and invalidates task caches on changes. + * Subscribes to task status SSE events and refreshes the task list on changes. */ export function useTaskEvents(workspaceId: string | undefined) { const queryClient = useQueryClient() @@ -66,8 +23,8 @@ export function useTaskEvents(workspaceId: string | undefined) { `/api/mothership/events?workspaceId=${encodeURIComponent(workspaceId)}` ) - eventSource.addEventListener('task_status', (event) => { - handleTaskStatusEvent(queryClient, event instanceof MessageEvent ? event.data : undefined) + eventSource.addEventListener('task_status', () => { + handleTaskStatusEvent(queryClient) }) eventSource.onerror = () => { diff --git a/apps/sim/lib/copilot/chat/post.test.ts b/apps/sim/lib/copilot/chat/post.test.ts index c2c884c96de..75475c5739e 100644 --- a/apps/sim/lib/copilot/chat/post.test.ts +++ b/apps/sim/lib/copilot/chat/post.test.ts @@ -84,7 +84,7 @@ vi.mock('@/lib/copilot/chat/lifecycle', () => ({ vi.mock('@/lib/copilot/tasks', () => ({ taskPubSub: { - publishStatusChanged: vi.fn(), + publishTaskListChanged: vi.fn(), }, })) diff --git a/apps/sim/lib/copilot/chat/post.ts b/apps/sim/lib/copilot/chat/post.ts index b15e84db694..4300a82b958 100644 --- a/apps/sim/lib/copilot/chat/post.ts +++ b/apps/sim/lib/copilot/chat/post.ts @@ -284,7 +284,7 @@ async function persistUserMessage(params: { .returning({ messages: copilotChats.messages }) if (notifyWorkspaceStatus && updated && workspaceId) { - taskPubSub?.publishStatusChanged({ workspaceId, chatId, type: 'started' }) + taskPubSub?.publishTaskListChanged({ workspaceId }) } return Array.isArray(updated?.messages) ? updated.messages : undefined @@ -348,11 +348,7 @@ function buildOnComplete(params: { }) if (notifyWorkspaceStatus && workspaceId) { - taskPubSub?.publishStatusChanged({ - workspaceId, - chatId, - type: 'completed', - }) + taskPubSub?.publishTaskListChanged({ workspaceId }) } } catch (error) { logger.error(`[${requestId}] Failed to persist chat messages`, { @@ -379,11 +375,7 @@ function buildOnError(params: { await finalizeAssistantTurn({ chatId, userMessageId }) if (notifyWorkspaceStatus && workspaceId) { - taskPubSub?.publishStatusChanged({ - workspaceId, - chatId, - type: 'completed', - }) + taskPubSub?.publishTaskListChanged({ workspaceId }) } } catch (error) { logger.error(`[${requestId}] Failed to finalize errored chat stream`, { diff --git a/apps/sim/lib/copilot/request/lifecycle/start.ts b/apps/sim/lib/copilot/request/lifecycle/start.ts index caf6fb4df87..7e2be717917 100644 --- a/apps/sim/lib/copilot/request/lifecycle/start.ts +++ b/apps/sim/lib/copilot/request/lifecycle/start.ts @@ -269,7 +269,7 @@ function fireTitleGeneration(params: { payload: { kind: MothershipStreamV1SessionKind.title, title }, }) if (workspaceId) { - taskPubSub?.publishStatusChanged({ workspaceId, chatId, type: 'renamed' }) + taskPubSub?.publishTaskListChanged({ workspaceId }) } }) .catch((error) => { diff --git a/apps/sim/lib/copilot/tasks.ts b/apps/sim/lib/copilot/tasks.ts index 5828a711cb4..272d63abc94 100644 --- a/apps/sim/lib/copilot/tasks.ts +++ b/apps/sim/lib/copilot/tasks.ts @@ -1,29 +1,29 @@ /** - * Task Status Pub/Sub Adapter + * Task List Change Pub/Sub Adapter * - * Broadcasts task status events across processes using Redis Pub/Sub. - * Gracefully falls back to process-local EventEmitter when Redis is unavailable. + * Broadcasts workspace-scoped task list change notifications across processes + * using Redis Pub/Sub. Gracefully falls back to a process-local EventEmitter + * when Redis is unavailable. * * Channel: `task:status_changed` */ import { createPubSubChannel } from '@/lib/events/pubsub' -interface TaskStatusEvent { +interface TaskListChangedEvent { workspaceId: string - chatId: string - type: 'started' | 'completed' | 'created' | 'deleted' | 'renamed' } const channel = typeof window !== 'undefined' ? null - : createPubSubChannel({ channel: 'task:status_changed', label: 'task' }) + : createPubSubChannel({ channel: 'task:status_changed', label: 'task' }) export const taskPubSub = channel ? { - publishStatusChanged: (event: TaskStatusEvent) => channel.publish(event), - onStatusChanged: (handler: (event: TaskStatusEvent) => void) => channel.subscribe(handler), + publishTaskListChanged: (event: TaskListChangedEvent) => channel.publish(event), + onTaskListChanged: (handler: (event: TaskListChangedEvent) => void) => + channel.subscribe(handler), dispose: () => channel.dispose(), } : null diff --git a/apps/sim/lib/mothership/inbox/executor.ts b/apps/sim/lib/mothership/inbox/executor.ts index 8eaace6c3b5..9d77131069a 100644 --- a/apps/sim/lib/mothership/inbox/executor.ts +++ b/apps/sim/lib/mothership/inbox/executor.ts @@ -113,10 +113,8 @@ export async function executeInboxTask(taskId: string): Promise { .then(async (title) => { if (title && chatId) { await db.update(copilotChats).set({ title }).where(eq(copilotChats.id, chatId)) - taskPubSub?.publishStatusChanged({ + taskPubSub?.publishTaskListChanged({ workspaceId: ws.id, - chatId, - type: 'renamed', }) } }) @@ -124,18 +122,14 @@ export async function executeInboxTask(taskId: string): Promise { logger.warn('Failed to generate chat title', { chatId, err }) }) - taskPubSub?.publishStatusChanged({ + taskPubSub?.publishTaskListChanged({ workspaceId: ws.id, - chatId, - type: 'created', }) } if (chatId) { - taskPubSub?.publishStatusChanged({ + taskPubSub?.publishTaskListChanged({ workspaceId: ws.id, - chatId, - type: 'started', }) } @@ -240,10 +234,8 @@ export async function executeInboxTask(taskId: string): Promise { .where(eq(mothershipInboxTask.id, taskId)) if (chatId) { - taskPubSub?.publishStatusChanged({ + taskPubSub?.publishTaskListChanged({ workspaceId: ws.id, - chatId, - type: 'completed', }) }