-
Notifications
You must be signed in to change notification settings - Fork 242
Description
I know this may not be an issue but I wish to get an answer from gurus in Python and Pytorch.(I am not a CS major student but loves Deep RL)
In homework 2's pdf, you said:
A serious bottleneck in the learning, for more complex environments, is the sample collection time. In infrastructure/rl_trainer.py, we only collect trajectories in a single thread, but this process can be fully parallelized across threads to get a useful speedup. Implement the parallelization and report on the difference in training time.
I have tried using multiprocessing in cs285.infrastructure.utils.sample_trajectories and
def sample_trajectories(env, policy, min_timesteps_per_batch, max_path_length, render=False, render_mode=('rgb_array')):
import multiprocessing as mp
try:
mp.set_start_method('spawn')
except RuntimeError:
pass
mp_paths_manager = mp.Manager()
mp_paths = mp_paths_manager.list()
mp_timesteps_this_batch = mp.Value('i', 0)
enough_event = mp.Event()
processes = []
num_process = mp.cpu_count()
# num_process = 6 # spwan new thread also requires new gpu memory
for p in range(num_process):
p = mp.Process(target=sample_trajectory,
args=(mp_timesteps_this_batch, min_timesteps_per_batch, mp_paths, enough_event,
env, policy, max_path_length, render, render_mode))
p.start()
processes.append(p)
enough_event.wait()
for p in processes:
p.terminate()
for p in processes:
p.join()
with mp_timesteps_this_batch.get_lock():
mp_timesteps_this_batch = mp_timesteps_this_batch.value
paths = list(mp_paths)
return paths, mp_timesteps_this_batchand modified cs285.infrastructure.utils.sample_trajectory with:
def sample_trajectory(mp_timesteps_this_batch, min_timesteps_per_batch, mp_paths, enough_event,
env, policy, max_path_length, render=False, render_mode=('rgb_array')):
while True:
# initialize env for the beginning of a new rollout
ob = env.reset() # HINT: should be the output of resetting the env
......
path_to_append = Path(obs, image_obs, acs, rewards, next_obs, terminals)
len_path_to_append = get_pathlength(path_to_append)
with mp_timesteps_this_batch.get_lock():
if mp_timesteps_this_batch.value >= min_timesteps_per_batch:
enough_event.set()
elif mp_timesteps_this_batch.value + len_path_to_append >= min_timesteps_per_batch:
mp_paths.append(path_to_append)
mp_timesteps_this_batch.value += len_path_to_append
enough_event.set()
else:
mp_paths.append(path_to_append)
mp_timesteps_this_batch.value += len_path_to_appendHowever, because sample_trajectory uses policy.get_action, which involves CUDA operations, there is a lot errors thrown such as:
CUDA error: an illegal memory access was encountered.
It seems OK if running completely with only CPU.
So, can anyone gives me a thought on how to implement a parallel version of trajectories collecting?
Thanks a lot!