diff --git a/README.md b/README.md
index a0000ec..477af6a 100644
--- a/README.md
+++ b/README.md
@@ -10,7 +10,7 @@
@@ -39,16 +39,18 @@ AgentFly is an extensible framework for building LLM agents with reinforcement l ## News -**08/2025 Multi-Modal (Vision) Agent Training Support** - Thanks to the powerful template system, AgentFly now supports training vision-language agents! π Train agents that can see and understand visual content, including GUI automation and image-based QA. See our [predefined training examples](docs/examples/predefined_training_examples.md) for ready-to-use scripts. +**12/2025 verl update**: Updated verl to 0.6.x version. + +**08/2025 Multi-Modal (Vision) Agent Training Support**: Thanks to the powerful template system, AgentFly now supports training vision-language agents! π Train agents that can see and understand visual content, including GUI automation and image-based QA. See our [predefined training examples](docs/examples/predefined_training_examples.md) for ready-to-use scripts. --- -**08/2025 Chat Template System** - A flexible framework for creating conversation templates with multi-model support, vision capabilities, and tool integration. [Learn more β](docs/chat_template/) +**08/2025 Chat Template System**: A flexible framework for creating conversation templates with multi-model support, vision capabilities, and tool integration. [Learn more β](docs/chat_template/) ## Installation **Option 1**: One-line Installation: ``` -bash install.sh # Assume conda with python3.10.x +bash install.sh # Assume conda with python3.12.x ``` **Option 2**: Customized Installation @@ -144,7 +146,7 @@ During training, `question` will be used to format the input messages, while oth #### 2. Tools & Rewards You can use any existing tool, which is in [documentation](https://agentfly.readthedocs.io/), or define a tool by decorating it with `@tool`. The output should eighther be a string, or a dictionary containing `observation` as a key. ```python -@reward(name="customized_tool") +@tool(name="customized_tool") def customized_tool(arg1, arg2): # tool logic here ``` diff --git a/agentfly/agents/agent_base.py b/agentfly/agents/agent_base.py index 9df34c1..141678c 100644 --- a/agentfly/agents/agent_base.py +++ b/agentfly/agents/agent_base.py @@ -55,7 +55,7 @@ def __init__( log_file: str = "agent", streaming: str = "console", debug: bool = False, - monitors: List[str] = [], + monitors: List[str] = ["wandb"], wandb_project_name: str = None, wandb_run_name: str = None, local_cache_dir: str = None, @@ -184,6 +184,12 @@ def _preprocess_messages(self, messages: List[Dict]): return messages_list.to_list() + def _preprocess_backends(self): + self.llm_engine.preprocess() + + def _postprocess_backends(self): + self.llm_engine.postprocess() + def _initialize_monitor(self, monitors: List[str]) -> None: for monitor in monitors: if monitor == "local": @@ -212,14 +218,17 @@ async def run(self, """ processed_messages = self._preprocess_messages(messages) + self._preprocess_backends() - return await self.run_async( + await self.run_async( processed_messages, max_turns=max_turns, generation_config=generation_config, **kwargs, ) + self._postprocess_backends() + def set_llm_engine(self, llm_engine: Any, tokenizer: Any, processor: Any): assert self.backend == "async_verl", "Only async verl backend is supported for now" diff --git a/agentfly/agents/llm_backends/backend_configs.py b/agentfly/agents/llm_backends/backend_configs.py index a8974b1..e56be8f 100644 --- a/agentfly/agents/llm_backends/backend_configs.py +++ b/agentfly/agents/llm_backends/backend_configs.py @@ -1,4 +1,4 @@ -from dataclasses import dataclass +from dataclasses import dataclass, field from typing import Optional, Dict, Any, List from vllm import AsyncEngineArgs @@ -39,7 +39,7 @@ class VLLMConfig: -@dataclass +@dataclass(init=False) class AsyncVLLMConfig: """Configuration for Async VLLM backend with engine arguments. Arguments are the same as vLLM's arguments, which can be found at https://docs.vllm.ai/en/latest/configuration/engine_args.html. Here listed some important arguments: @@ -53,10 +53,21 @@ class AsyncVLLMConfig: data_parallel_size (int): Data parallel size. tensor_parallel_size (int): Tensor parallel size. """ - engine_args: AsyncEngineArgs = AsyncEngineArgs() - - def __init__(self, **kwargs): - self.engine_args = AsyncEngineArgs(**kwargs) + engine_args: AsyncEngineArgs + + def __init__(self, engine_args: Optional[AsyncEngineArgs] = None, **kwargs): + """Initialize AsyncVLLMConfig. + + Args: + engine_args: Optional AsyncEngineArgs instance. If provided, kwargs are ignored. + **kwargs: Arguments to pass to AsyncEngineArgs if engine_args is not provided. + """ + if engine_args is not None: + self.engine_args = engine_args + elif kwargs: + self.engine_args = AsyncEngineArgs(**kwargs) + else: + self.engine_args = AsyncEngineArgs() @dataclass diff --git a/agentfly/agents/llm_backends/llm_backends.py b/agentfly/agents/llm_backends/llm_backends.py index df62335..a278966 100644 --- a/agentfly/agents/llm_backends/llm_backends.py +++ b/agentfly/agents/llm_backends/llm_backends.py @@ -3,8 +3,6 @@ This module provides a unified interface to different LLM implementations. """ import asyncio -from asyncore import loop -from collections import deque import copy from functools import partial import time @@ -54,6 +52,10 @@ def apply_chat_template(self, messages_list: List[List[Dict]], template: str, ad vision_inputs.append(chat.vision_inputs()) return prompts, vision_inputs + + def prepare(self): + """Prepare the backend""" + pass def generate(self, messages_list: str, **kwargs) -> str: """Generate text from prompt""" @@ -404,6 +406,18 @@ def __init__(self, llm_engine, model_name_or_path: str, template: str, max_lengt trust_remote_code=True, ) self.llm_engine = llm_engine + + def preprocess(self): + """Preprocess the backend""" + self.llm_engine.wake_up() + if self.llm_engine.reward_model_manager: + self.llm_engine.reward_model_manager.wake_up() + + def postprocess(self): + """Postprocess the backend""" + self.llm_engine.sleep() + if self.llm_engine.reward_model_manager: + self.llm_engine.reward_model_manager.sleep() def _process_inputs(self, prompts: List[str], vision_inputs: Dict[str, List[PIL.Image.Image]]): inputs = [] @@ -433,6 +447,22 @@ def _convert_to_openai_chat_without_tool_call_processing(self, messages: list) - if "tool_choice" in message: del message["tool_choice"] return messages + + def _process_messages(self, messages: List[Dict]): + new_messages = [] + for message in messages: + new_message = {} + new_message.update(message) + if isinstance(message["content"], list): + if len(message["content"]) == 1: + assert message["content"][0]["type"] == "text" + new_message["content"] = message["content"][0]["text"] + else: + new_message["content"] = message["content"] + + new_messages.append(new_message) + return new_messages + async def generate_async(self, messages_list: str, **kwargs) -> str: """Generate text from prompt using Verl""" @@ -440,6 +470,8 @@ async def generate_async(self, messages_list: str, **kwargs) -> str: generation_config = {} tensors = torch.ones(len(messages_list), dtype=torch.int64) + # messages_list = [self._convert_to_openai_chat_without_tool_call_processing(messages) for messages in messages_list] + messages_list = [self._process_messages(messages) for messages in messages_list] messages_list = [self._convert_to_openai_chat_without_tool_call_processing(messages) for messages in messages_list] tools = kwargs.get("tools", None) tools_list = np.array([tools] * len(messages_list)) @@ -453,8 +485,11 @@ async def generate_async(self, messages_list: str, **kwargs) -> str: batch = DataProto.from_single_dict(data, meta_info={"n": n, "temperature": temperature}) - gen_batch_output = await self.llm_engine.generate_sequences_async(batch, **generation_config) - response_texts = gen_batch_output.batch['responses'].tolist() # np.array of strings with length BS + gen_batch_output = await self.llm_engine.generate_sequences_async(batch) + response_ids = gen_batch_output.batch['responses'].tolist() # np.array of strings with length BS + assert len(response_ids) == len(messages_list) + response_texts = [self.tokenizer.decode(response_id, skip_special_tokens=True) for response_id in response_ids] + return response_texts diff --git a/agentfly/agents/specialized/hf_agent.py b/agentfly/agents/specialized/hf_agent.py index eee913f..9023338 100644 --- a/agentfly/agents/specialized/hf_agent.py +++ b/agentfly/agents/specialized/hf_agent.py @@ -5,12 +5,16 @@ from typing import List from ..agent_base import BaseAgent from ..parsers import extract_tool_calls +import logging + +logger = logging.getLogger(__file__) class HFAgent(BaseAgent): def __init__(self, model_name_or_path: str, **kwargs): super().__init__(model_name_or_path, **kwargs) def parse(self, responses: List[str], **kwargs) -> List[Dict]: + logger.debug(f"[HFAgent] Responses: {responses}") new_messages_list = [] for response in responses: tool_calls = extract_tool_calls(response) diff --git a/agentfly/envs/manager/resource.py b/agentfly/envs/manager/resource.py index 8b32db5..207ea0e 100644 --- a/agentfly/envs/manager/resource.py +++ b/agentfly/envs/manager/resource.py @@ -19,8 +19,8 @@ def cleanup_envs(): for env in tqdm(GLOBAL_ENVS): env.close() -import atexit, signal +# import atexit, signal -atexit.register(cleanup_envs) -for sig in [signal.SIGTERM, signal.SIGINT]: - signal.signal(sig, cleanup_envs) +# atexit.register(cleanup_envs) +# for sig in [signal.SIGTERM, signal.SIGINT]: +# signal.signal(sig, cleanup_envs) diff --git a/agentfly/tests/scripts/test_cpu_runs.sh b/agentfly/tests/scripts/test_cpu_runs.sh index c3f0fc9..05a82a0 100644 --- a/agentfly/tests/scripts/test_cpu_runs.sh +++ b/agentfly/tests/scripts/test_cpu_runs.sh @@ -3,8 +3,7 @@ # Test CPU runs -pytest -x agentfly/tests/unit/tools/ -pytest -x agentfly/tests/unit/envs/ -pytest -x agentfly/tests/unit/rewards/ - -pytest -x agentfly/tests/unit/templates/ \ No newline at end of file +pytest -x agentfly/tests/unit/tools/ || exit 1 +pytest -x agentfly/tests/unit/envs/ || exit 1 +pytest -x agentfly/tests/unit/rewards/ || exit 1 +pytest -x agentfly/tests/unit/templates/ || exit 1 \ No newline at end of file diff --git a/agentfly/utils/monitor.py b/agentfly/utils/monitor.py index 2dd66f1..023831b 100644 --- a/agentfly/utils/monitor.py +++ b/agentfly/utils/monitor.py @@ -188,50 +188,71 @@ async def close(self) -> None: # pragma: no cover wandb.finish() -# Example of a wrapper sink that filters kinds/names without touching producers -class FilterSink(BaseSink): - """Wrap another sink and allow include/exclude rules.""" - - def __init__( - self, - wrapped: BaseSink, - include_kinds: Optional[List[str]] = None, - exclude_kinds: Optional[List[str]] = None, - ) -> None: - self.wrapped = wrapped - self.include = set(include_kinds or []) - self.exclude = set(exclude_kinds or []) - - async def log(self, evt: MetricEvent) -> None: - if self.include and evt.kind not in self.include: - return - if evt.kind in self.exclude: - return - await self.wrapped.log(evt) - - async def flush(self) -> None: - await self.wrapped.flush() - - async def close(self) -> None: - await self.wrapped.close() - - - class Monitor: """Singleton helper controlling the consumer task and registered sinks.""" _sinks: Dict[str, BaseSink] = {} - _queue: "asyncio.Queue[MetricEvent | None]" = asyncio.Queue() + _queue: Optional["asyncio.Queue[MetricEvent | None]"] = None + _queue_loop: Optional[asyncio.AbstractEventLoop] = None _consumer_task: Optional[asyncio.Task[None]] = None _running: bool = False # ββ lifecycle ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ + @classmethod + def _ensure_queue(cls) -> "asyncio.Queue[MetricEvent | None]": + """Ensure queue exists and is bound to the current event loop.""" + try: + current_loop = asyncio.get_running_loop() + except RuntimeError: + # No running event loop - create queue without binding (will bind on first use) + if cls._queue is None: + cls._queue = asyncio.Queue() + cls._queue_loop = None + return cls._queue + + # Check if queue needs to be recreated for current event loop + if cls._queue is None or cls._queue_loop is not current_loop: + # Recreate queue for current event loop + # Note: We can't migrate events from old queue, but that's acceptable + # since events are best-effort and the old loop is closed anyway + cls._queue = asyncio.Queue() + cls._queue_loop = current_loop + + return cls._queue + @classmethod def ensure_started(cls) -> None: - if cls._running: - return - cls._consumer_task = asyncio.create_task(cls._consumer_loop(), name="monitor-consumer") - cls._running = True + # Check if consumer task is still running + if cls._running and cls._consumer_task is not None: + try: + # Check if task is done or cancelled + if cls._consumer_task.done(): + # Task completed/cancelled, need to restart + cls._running = False + cls._consumer_task = None + else: + # Task is still running, nothing to do + return + except (RuntimeError, AttributeError, Exception): + # Task might be from a different event loop that was closed + # or the task object might be invalid + cls._running = False + cls._consumer_task = None + + # Ensure queue is bound to current event loop + cls._ensure_queue() + + # Create new consumer task + try: + loop = asyncio.get_running_loop() + cls._consumer_task = loop.create_task(cls._consumer_loop(), name="monitor-consumer") + cls._running = True + except RuntimeError: + # No running event loop - this shouldn't happen in normal usage + # but we'll handle it gracefully + print("[Monitor] Warning: No running event loop found. Monitor consumer not started.") + cls._running = False + cls._consumer_task = None @classmethod async def shutdown(cls) -> None: @@ -239,16 +260,18 @@ async def shutdown(cls) -> None: if not cls._running: return + # Ensure queue exists + queue = cls._ensure_queue() # send sentinel - await cls._queue.put(None) + await queue.put(None) await cls._consumer_task for sink in list(cls._sinks.values()): with contextlib.suppress(Exception): await sink.close() cls._sinks.clear() cls._running = False - - # ββ sink management βββββββββββββββββββββββββββββββββββββββββββββββββββββ + cls._queue = None + cls._queue_loop = None @classmethod def add_sink(cls, name: str, sink: BaseSink) -> None: @@ -264,12 +287,11 @@ async def _close() -> None: await sink.close() asyncio.create_task(_close()) - # ββ core consumer βββββββββββββββββββββββββββββββββββββββββββββββββββββββ - @classmethod async def _consumer_loop(cls) -> None: + queue = cls._ensure_queue() while True: - evt = await cls._queue.get() + evt = await queue.get() if evt is None: # sentinel break for sink_name, sink in list(cls._sinks.items()): @@ -281,93 +303,34 @@ async def _consumer_loop(cls) -> None: except Exception as exc: print(f"[Monitor] Sink {sink!r} failed: {exc}") # drain any remaining events (bestβeffort) - while not cls._queue.empty(): - cls._queue.get_nowait() - + while not queue.empty(): + queue.get_nowait() def emit(evt: MetricEvent) -> None: """Enqueue an event for asynchronous processing (nonβblocking).""" Monitor.ensure_started() + queue = Monitor._ensure_queue() try: - Monitor._queue.put_nowait(evt) - except asyncio.QueueFull: # extremely unlikely β drop oldest - Monitor._queue.get_nowait() - Monitor._queue.put_nowait(evt) - - -class ResourcePoller: - """Emit process RSS/CPU every *interval* seconds.""" - - def __init__(self, interval: float = 10.0, *, run_id: str | None = None): - import psutil # heavyweight; keep local to avoid hard dep for everybody - - self.psutil = psutil - self.interval = interval - self.run_id = run_id - self._task: Optional[asyncio.Task[None]] = None - - # API ------------------------------------------------------------------- - def start(self) -> None: - if self._task is None: - self._task = asyncio.create_task(self._loop(), name="resource-poller") - - def stop(self) -> None: - if self._task: - self._task.cancel() - - # internals -------------------------------------------------------------- - async def _loop(self) -> None: - proc = self.psutil.Process(os.getpid()) - while True: - rss_mb = proc.memory_info().rss / 1e6 - cpu = proc.cpu_percent(interval=None) # % since last call - emit( - MetricEvent( - kind="resource", - name="memory/rss_mb", - value=rss_mb, - tags={"run": self.run_id} if self.run_id else {}, - ) - ) - emit( - MetricEvent( - kind="resource", - name="cpu/percent", - value=cpu, - tags={"run": self.run_id} if self.run_id else {}, - ) - ) - await asyncio.sleep(self.interval) - - - -async def _demo() -> None: # pragma: no cover - """Run with `python -m monitoring` to see events flowing.""" - - # 1. sinks ---------------------------------------------------------------- - Monitor.add_sink("jsonl", JsonlSink("demo_metrics.jsonl")) - - try: - Monitor.add_sink("wandb", WandbSink(project="agentrl-demo")) - except ModuleNotFoundError: - print("wandb not installed - skipping WandbSink") - - # 2. start poller --------------------------------------------------------- - poller = ResourcePoller(interval=5.0) - poller.start() - - # 3. produce some fake scalar metrics ------------------------------------ - for step in range(20): - reward = 1.0 - (step / 20) - emit(MetricEvent("scalar", "reward/step", reward, step=step)) - await asyncio.sleep(0.5) - - # 4. graceful shutdown ---------------------------------------------------- - poller.stop() - await Monitor.shutdown() - + queue.put_nowait(evt) + except (asyncio.QueueFull, RuntimeError) as e: + # QueueFull: extremely unlikely β drop oldest + # RuntimeError: queue not bound to current event loop (shouldn't happen in normal usage) + if isinstance(e, RuntimeError) and "bound to a different event loop" in str(e): + # Queue is bound to different loop, recreate it + Monitor._queue = None + Monitor._queue_loop = None + queue = Monitor._ensure_queue() + try: + queue.put_nowait(evt) + except asyncio.QueueFull: + queue.get_nowait() + queue.put_nowait(evt) + elif isinstance(e, asyncio.QueueFull): + queue.get_nowait() + queue.put_nowait(evt) + else: + # Re-raise other RuntimeErrors + raise -if __name__ == "__main__": - asyncio.run(_demo()) diff --git a/assets/images/wechat.jpg b/assets/images/wechat.jpg index 730c516..7877f44 100644 Binary files a/assets/images/wechat.jpg and b/assets/images/wechat.jpg differ diff --git a/pyproject.toml b/pyproject.toml index 55ca2b0..2e333c0 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -12,10 +12,10 @@ packages = ["agentfly"] [project] name = "AgentFly" -version = "0.0.1" +version = "0.1.0" description = "Agent reinforcement learning framework." readme = "README.md" -requires-python = ">=3.10,<3.13" # Release the version limit for Python 3.12 +requires-python = ">=3.12,<3.13" # Python 3.12.x license = { text = "Apache-2.0" } dependencies = [ @@ -54,7 +54,7 @@ verl = [ "pylatexenc", "pre-commit", "ray[default]", - "tensordict<=0.6.2", + "tensordict", "torchdata", "transformers", "packaging>=20.0", diff --git a/verl b/verl index a06481c..dc6bed0 160000 --- a/verl +++ b/verl @@ -1 +1 @@ -Subproject commit a06481c311ce6081fde33c979c539b6adc10be3b +Subproject commit dc6bed0bea2df926d151478a78311e4552014abc