Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion apps/sim/app/api/copilot/chat/delete/route.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ vi.mock('@/lib/copilot/chat/lifecycle', () => ({
}))

vi.mock('@/lib/copilot/tasks', () => ({
taskPubSub: { publishStatusChanged: vi.fn() },
taskPubSub: { publishTaskListChanged: vi.fn() },
}))

import { DELETE } from './route'
Expand Down
4 changes: 1 addition & 3 deletions apps/sim/app/api/copilot/chat/delete/route.ts
Original file line number Diff line number Diff line change
Expand Up @@ -41,10 +41,8 @@ export async function DELETE(request: NextRequest) {
logger.info('Chat deleted', { chatId: parsed.chatId })

if (deleted.workspaceId) {
taskPubSub?.publishStatusChanged({
taskPubSub?.publishTaskListChanged({
workspaceId: deleted.workspaceId,
chatId: parsed.chatId,
type: 'deleted',
})
}

Expand Down
4 changes: 1 addition & 3 deletions apps/sim/app/api/copilot/chat/rename/route.ts
Original file line number Diff line number Diff line change
Expand Up @@ -44,10 +44,8 @@ export async function PATCH(request: NextRequest) {
logger.info('Chat renamed', { chatId, title })

if (updated.workspaceId) {
taskPubSub?.publishStatusChanged({
taskPubSub?.publishTaskListChanged({
workspaceId: updated.workspaceId,
chatId,
type: 'renamed',
})
}

Expand Down
4 changes: 1 addition & 3 deletions apps/sim/app/api/copilot/chat/stop/route.ts
Original file line number Diff line number Diff line change
Expand Up @@ -126,10 +126,8 @@ export async function POST(req: NextRequest) {
.returning({ workspaceId: copilotChats.workspaceId })

if (updated?.workspaceId) {
taskPubSub?.publishStatusChanged({
taskPubSub?.publishTaskListChanged({
workspaceId: updated.workspaceId,
chatId,
type: 'completed',
})
}

Expand Down
2 changes: 1 addition & 1 deletion apps/sim/app/api/copilot/chats/route.ts
Original file line number Diff line number Diff line change
Expand Up @@ -128,7 +128,7 @@ export async function POST(request: NextRequest) {
return createInternalServerErrorResponse('Failed to create chat')
}

taskPubSub?.publishStatusChanged({ workspaceId, chatId: result.chatId, type: 'created' })
taskPubSub?.publishTaskListChanged({ workspaceId })

return NextResponse.json({ success: true, id: result.chatId })
} catch (error) {
Expand Down
8 changes: 2 additions & 6 deletions apps/sim/app/api/mothership/chats/[chatId]/route.ts
Original file line number Diff line number Diff line change
Expand Up @@ -165,10 +165,8 @@ export async function PATCH(

if (updatedChat.workspaceId) {
if (title !== undefined) {
taskPubSub?.publishStatusChanged({
taskPubSub?.publishTaskListChanged({
workspaceId: updatedChat.workspaceId,
chatId,
type: 'renamed',
})
captureServerEvent(
userId,
Expand Down Expand Up @@ -239,10 +237,8 @@ export async function DELETE(
}

if (deletedChat.workspaceId) {
taskPubSub?.publishStatusChanged({
taskPubSub?.publishTaskListChanged({
workspaceId: deletedChat.workspaceId,
chatId,
type: 'deleted',
})
captureServerEvent(
userId,
Expand Down
2 changes: 1 addition & 1 deletion apps/sim/app/api/mothership/chats/route.ts
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,7 @@ export async function POST(request: NextRequest) {
})
.returning({ id: copilotChats.id })

taskPubSub?.publishStatusChanged({ workspaceId, chatId: chat.id, type: 'created' })
taskPubSub?.publishTaskListChanged({ workspaceId })

captureServerEvent(
userId,
Expand Down
12 changes: 4 additions & 8 deletions apps/sim/app/api/mothership/events/route.ts
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
/**
* SSE endpoint for task status events.
*
* Pushes `task_status` events to the browser when tasks are
* started, completed, created, deleted, or renamed.
* Pushes `task_status` events to the browser when the workspace task list
* changes.
*
* Auth is handled via session cookies (EventSource sends cookies automatically).
*/
Expand All @@ -18,13 +18,9 @@ export const GET = createWorkspaceSSE({
{
subscribe: (workspaceId, send) => {
if (!taskPubSub) return () => {}
return taskPubSub.onStatusChanged((event) => {
return taskPubSub.onTaskListChanged((event) => {
if (event.workspaceId !== workspaceId) return
send('task_status', {
chatId: event.chatId,
type: event.type,
timestamp: Date.now(),
})
send('task_status', {})
})
},
},
Expand Down
33 changes: 27 additions & 6 deletions apps/sim/app/workspace/[workspaceId]/home/hooks/use-chat.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2622,16 +2622,30 @@ export function useChat(
[]
)

const invalidateChatQueries = useCallback(() => {
const activeChatId = chatIdRef.current
if (activeChatId) {
const invalidateTaskHistory = useCallback(
(refetchType: 'active' | 'none' = 'active') => {
const activeChatId = chatIdRef.current
if (!activeChatId) {
return
}

queryClient.invalidateQueries({
queryKey: taskKeys.detail(activeChatId),
refetchType,
})
}
},
[queryClient]
)

const invalidateTaskList = useCallback(() => {
queryClient.invalidateQueries({ queryKey: taskKeys.list(workspaceId) })
}, [workspaceId, queryClient])

const invalidateChatQueries = useCallback(() => {
invalidateTaskHistory()
invalidateTaskList()
}, [invalidateTaskHistory, invalidateTaskList])

const messagesRef = useRef(messages)
messagesRef.current = messages

Expand All @@ -2643,7 +2657,8 @@ export function useChat(
clearActiveTurn()
setTransportIdle()
abortControllerRef.current = null
invalidateChatQueries()
invalidateTaskHistory('none')
invalidateTaskList()

if (!options?.error) {
const cid = chatIdRef.current
Expand All @@ -2660,7 +2675,13 @@ export function useChat(
void enqueueQueueDispatchRef.current({ type: 'send_head' })
}
},
[clearActiveTurn, invalidateChatQueries, reconcileTerminalPreviewSessions, setTransportIdle]
[
clearActiveTurn,
invalidateTaskHistory,
invalidateTaskList,
reconcileTerminalPreviewSessions,
setTransportIdle,
]
)
finalizeRef.current = finalize

Expand Down
40 changes: 2 additions & 38 deletions apps/sim/hooks/use-task-events.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -16,44 +16,8 @@ describe('handleTaskStatusEvent', () => {
vi.clearAllMocks()
})

it('invalidates the task list and completed chat detail', () => {
handleTaskStatusEvent(
queryClient,
JSON.stringify({
chatId: 'chat-1',
type: 'completed',
timestamp: Date.now(),
})
)

expect(queryClient.invalidateQueries).toHaveBeenCalledTimes(2)
expect(queryClient.invalidateQueries).toHaveBeenNthCalledWith(1, {
queryKey: taskKeys.lists(),
})
expect(queryClient.invalidateQueries).toHaveBeenNthCalledWith(2, {
queryKey: taskKeys.detail('chat-1'),
})
})

it('keeps list invalidation only for non-completed task events', () => {
handleTaskStatusEvent(
queryClient,
JSON.stringify({
chatId: 'chat-1',
type: 'started',
timestamp: Date.now(),
})
)

expect(queryClient.invalidateQueries).toHaveBeenCalledTimes(1)
expect(queryClient.invalidateQueries).toHaveBeenCalledWith({
queryKey: taskKeys.lists(),
})
})

it('preserves list invalidation when task event payload is invalid', () => {
handleTaskStatusEvent(queryClient, '{')

it('invalidates the task list for task status updates', () => {
handleTaskStatusEvent(queryClient)
expect(queryClient.invalidateQueries).toHaveBeenCalledTimes(1)
expect(queryClient.invalidateQueries).toHaveBeenCalledWith({
queryKey: taskKeys.lists(),
Expand Down
51 changes: 4 additions & 47 deletions apps/sim/hooks/use-task-events.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,55 +6,12 @@ import { taskKeys } from '@/hooks/queries/tasks'

const logger = createLogger('TaskEvents')

interface TaskStatusEventPayload {
chatId?: string
type?: 'started' | 'completed' | 'created' | 'deleted' | 'renamed'
}

function parseTaskStatusEventPayload(data: unknown): TaskStatusEventPayload | null {
let parsed = data

if (typeof parsed === 'string') {
try {
parsed = JSON.parse(parsed)
} catch {
return null
}
}

if (!parsed || typeof parsed !== 'object' || Array.isArray(parsed)) {
return null
}

const record = parsed as Record<string, unknown>

return {
...(typeof record.chatId === 'string' ? { chatId: record.chatId } : {}),
...(typeof record.type === 'string'
? { type: record.type as TaskStatusEventPayload['type'] }
: {}),
}
}

export function handleTaskStatusEvent(
queryClient: Pick<QueryClient, 'invalidateQueries'>,
data: unknown
): void {
export function handleTaskStatusEvent(queryClient: Pick<QueryClient, 'invalidateQueries'>): void {
queryClient.invalidateQueries({ queryKey: taskKeys.lists() })

const payload = parseTaskStatusEventPayload(data)
if (!payload) {
logger.warn('Received invalid task_status payload')
return
}

if (payload.type === 'completed' && payload.chatId) {
queryClient.invalidateQueries({ queryKey: taskKeys.detail(payload.chatId) })
}
}

/**
* Subscribes to task status SSE events and invalidates task caches on changes.
* Subscribes to task status SSE events and refreshes the task list on changes.
*/
export function useTaskEvents(workspaceId: string | undefined) {
const queryClient = useQueryClient()
Expand All @@ -66,8 +23,8 @@ export function useTaskEvents(workspaceId: string | undefined) {
`/api/mothership/events?workspaceId=${encodeURIComponent(workspaceId)}`
)

eventSource.addEventListener('task_status', (event) => {
handleTaskStatusEvent(queryClient, event instanceof MessageEvent ? event.data : undefined)
eventSource.addEventListener('task_status', () => {
handleTaskStatusEvent(queryClient)
})

eventSource.onerror = () => {
Expand Down
2 changes: 1 addition & 1 deletion apps/sim/lib/copilot/chat/post.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ vi.mock('@/lib/copilot/chat/lifecycle', () => ({

vi.mock('@/lib/copilot/tasks', () => ({
taskPubSub: {
publishStatusChanged: vi.fn(),
publishTaskListChanged: vi.fn(),
},
}))

Expand Down
14 changes: 3 additions & 11 deletions apps/sim/lib/copilot/chat/post.ts
Original file line number Diff line number Diff line change
Expand Up @@ -284,7 +284,7 @@ async function persistUserMessage(params: {
.returning({ messages: copilotChats.messages })

if (notifyWorkspaceStatus && updated && workspaceId) {
taskPubSub?.publishStatusChanged({ workspaceId, chatId, type: 'started' })
taskPubSub?.publishTaskListChanged({ workspaceId })
}

return Array.isArray(updated?.messages) ? updated.messages : undefined
Expand Down Expand Up @@ -348,11 +348,7 @@ function buildOnComplete(params: {
})

if (notifyWorkspaceStatus && workspaceId) {
taskPubSub?.publishStatusChanged({
workspaceId,
chatId,
type: 'completed',
})
taskPubSub?.publishTaskListChanged({ workspaceId })
}
} catch (error) {
logger.error(`[${requestId}] Failed to persist chat messages`, {
Expand All @@ -379,11 +375,7 @@ function buildOnError(params: {
await finalizeAssistantTurn({ chatId, userMessageId })

if (notifyWorkspaceStatus && workspaceId) {
taskPubSub?.publishStatusChanged({
workspaceId,
chatId,
type: 'completed',
})
taskPubSub?.publishTaskListChanged({ workspaceId })
}
} catch (error) {
logger.error(`[${requestId}] Failed to finalize errored chat stream`, {
Expand Down
2 changes: 1 addition & 1 deletion apps/sim/lib/copilot/request/lifecycle/start.ts
Original file line number Diff line number Diff line change
Expand Up @@ -269,7 +269,7 @@ function fireTitleGeneration(params: {
payload: { kind: MothershipStreamV1SessionKind.title, title },
})
if (workspaceId) {
taskPubSub?.publishStatusChanged({ workspaceId, chatId, type: 'renamed' })
taskPubSub?.publishTaskListChanged({ workspaceId })
}
})
.catch((error) => {
Expand Down
18 changes: 9 additions & 9 deletions apps/sim/lib/copilot/tasks.ts
Original file line number Diff line number Diff line change
@@ -1,29 +1,29 @@
/**
* Task Status Pub/Sub Adapter
* Task List Change Pub/Sub Adapter
*
* Broadcasts task status events across processes using Redis Pub/Sub.
* Gracefully falls back to process-local EventEmitter when Redis is unavailable.
* Broadcasts workspace-scoped task list change notifications across processes
* using Redis Pub/Sub. Gracefully falls back to a process-local EventEmitter
* when Redis is unavailable.
*
* Channel: `task:status_changed`
*/

import { createPubSubChannel } from '@/lib/events/pubsub'

interface TaskStatusEvent {
interface TaskListChangedEvent {
workspaceId: string
chatId: string
type: 'started' | 'completed' | 'created' | 'deleted' | 'renamed'
}

const channel =
typeof window !== 'undefined'
? null
: createPubSubChannel<TaskStatusEvent>({ channel: 'task:status_changed', label: 'task' })
: createPubSubChannel<TaskListChangedEvent>({ channel: 'task:status_changed', label: 'task' })

export const taskPubSub = channel
? {
publishStatusChanged: (event: TaskStatusEvent) => channel.publish(event),
onStatusChanged: (handler: (event: TaskStatusEvent) => void) => channel.subscribe(handler),
publishTaskListChanged: (event: TaskListChangedEvent) => channel.publish(event),
onTaskListChanged: (handler: (event: TaskListChangedEvent) => void) =>
channel.subscribe(handler),
dispose: () => channel.dispose(),
}
: null
Loading
Loading