Skip to content

ProcessPoolExecutor fails for sync tasks: cannot pickle '_contextvars.Context' object #611

@joyceleeor

Description

@joyceleeor

Taskiq version

0.12.1

Python version

Python 3.13

OS

Windows 11

What happened?

When using --use-process-pool flag to execute synchronous tasks, all tasks fail with TypeError: cannot pickle '_contextvars.Context' object.

This happens regardless of whether user code uses contextvars or not.

Root Cause

In taskiq/receiver/receiver.py (around line 250), for synchronous functions:

ctx = contextvars.copy_context()
func = functools.partial(target, *message.args, **kwargs)
target_future = loop.run_in_executor(
    self.executor,  # ProcessPoolExecutor
    ctx.run,        # Problem: bound method holds Context reference
    func,
)


### Relevant log output

```shell
### Log output (可选)

{"timestamp": "2026-04-24T09:24:33.080765+00:00", "level": "ERROR", "log_type": "general", "message": "Failed to process document task\nTraceback (most recent call last):\n  File \"app\\tasks\\doc_process.py\", line 91, in doc_process_task\n    final_state = await pipeline.run(initial_state)\n  File \"app\\pipeline\\pipeline.py\", line 243, in run\n    final_state = await self.graph.ainvoke(cast(PipelineState, initial_state))\n  File \".venv\\Lib\\site-packages\\langgraph\\pregel\\main.py\", line 3479, in ainvoke\n    async for chunk in self.astream(...)\n  File \".venv\\Lib\\site-packages\\langgraph\\pregel\\_runner.py\", line 304, in atick\n    await arun_with_retry(...)\n  File \".venv\\Lib\\site-packages\\langgraph\\pregel\\_retry.py\", line 242, in arun_with_retry\n    return await task.proc.ainvoke(task.input, config)\n  File \"app\\pipeline\\pipeline.py\", line 182, in _other_chunking_node\n    return cast(PipelineState, await self.other_chunking_strategy.execute(state))\n  File \"app\\pipeline\\chunking.py\", line 67, in execute\n    raise Exception(f\"Chunking task failed: {error_msg}\")\nException: Chunking task failed: cannot pickle '_contextvars.Context' object\n"}

Broker initialization code

from taskiq_redis import RedisAsyncResultBackend, RedisStreamBroker
from taskiq.middlewares import SimpleRetryMiddleware

result_backend = RedisAsyncResultBackend(redis_url="redis://localhost")

broker = RedisStreamBroker(
    url="redis://localhost",
    queue_name="io_queue",
    additional_streams={"cpu_queue": ">"},
).with_result_backend(result_backend).with_middlewares(
    SimpleRetryMiddleware(default_retry_count=2)
)

# Task definition
@broker.task(
    task_name="chunking_task",
    queue_name="cpu_queue",
    timeout=300,
    retry_on_error=True,
    max_retries=1
)
def chunking_task(text: str, chunk_size: int = 1000) -> list[str]:
    """Pure sync task - text splitting"""
    from langchain_text_splitters import RecursiveCharacterTextSplitter
    
    text_splitter = RecursiveCharacterTextSplitter(
        chunk_size=chunk_size,
        chunk_overlap=200,
    )
    return text_splitter.split_text(text)

# Worker command:
# taskiq worker app.core.taskiq:broker --use-process-pool --max-process-pool-processes 3

Metadata

Metadata

Assignees

No one assigned

    Labels

    bugSomething isn't working

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions