Taskiq version
0.12.1
Python version
Python 3.13
OS
Windows 11
What happened?
When using --use-process-pool flag to execute synchronous tasks, all tasks fail with TypeError: cannot pickle '_contextvars.Context' object.
This happens regardless of whether user code uses contextvars or not.
Root Cause
In taskiq/receiver/receiver.py (around line 250), for synchronous functions:
ctx = contextvars.copy_context()
func = functools.partial(target, *message.args, **kwargs)
target_future = loop.run_in_executor(
self.executor, # ProcessPoolExecutor
ctx.run, # Problem: bound method holds Context reference
func,
)
### Relevant log output
```shell
### Log output (可选)
{"timestamp": "2026-04-24T09:24:33.080765+00:00", "level": "ERROR", "log_type": "general", "message": "Failed to process document task\nTraceback (most recent call last):\n File \"app\\tasks\\doc_process.py\", line 91, in doc_process_task\n final_state = await pipeline.run(initial_state)\n File \"app\\pipeline\\pipeline.py\", line 243, in run\n final_state = await self.graph.ainvoke(cast(PipelineState, initial_state))\n File \".venv\\Lib\\site-packages\\langgraph\\pregel\\main.py\", line 3479, in ainvoke\n async for chunk in self.astream(...)\n File \".venv\\Lib\\site-packages\\langgraph\\pregel\\_runner.py\", line 304, in atick\n await arun_with_retry(...)\n File \".venv\\Lib\\site-packages\\langgraph\\pregel\\_retry.py\", line 242, in arun_with_retry\n return await task.proc.ainvoke(task.input, config)\n File \"app\\pipeline\\pipeline.py\", line 182, in _other_chunking_node\n return cast(PipelineState, await self.other_chunking_strategy.execute(state))\n File \"app\\pipeline\\chunking.py\", line 67, in execute\n raise Exception(f\"Chunking task failed: {error_msg}\")\nException: Chunking task failed: cannot pickle '_contextvars.Context' object\n"}
Broker initialization code
from taskiq_redis import RedisAsyncResultBackend, RedisStreamBroker
from taskiq.middlewares import SimpleRetryMiddleware
result_backend = RedisAsyncResultBackend(redis_url="redis://localhost")
broker = RedisStreamBroker(
url="redis://localhost",
queue_name="io_queue",
additional_streams={"cpu_queue": ">"},
).with_result_backend(result_backend).with_middlewares(
SimpleRetryMiddleware(default_retry_count=2)
)
# Task definition
@broker.task(
task_name="chunking_task",
queue_name="cpu_queue",
timeout=300,
retry_on_error=True,
max_retries=1
)
def chunking_task(text: str, chunk_size: int = 1000) -> list[str]:
"""Pure sync task - text splitting"""
from langchain_text_splitters import RecursiveCharacterTextSplitter
text_splitter = RecursiveCharacterTextSplitter(
chunk_size=chunk_size,
chunk_overlap=200,
)
return text_splitter.split_text(text)
# Worker command:
# taskiq worker app.core.taskiq:broker --use-process-pool --max-process-pool-processes 3
Taskiq version
0.12.1
Python version
Python 3.13
OS
Windows 11
What happened?
When using
--use-process-poolflag to execute synchronous tasks, all tasks fail withTypeError: cannot pickle '_contextvars.Context' object.This happens regardless of whether user code uses contextvars or not.
Root Cause
In
taskiq/receiver/receiver.py(around line 250), for synchronous functions:Broker initialization code