-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathenvironment.py
166 lines (124 loc) · 5.65 KB
/
environment.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
"""Augments OpenAI Gym environments with features like experience replay.
Heavily influenced by DeepMind's seminal paper 'Playing Atari with Deep Reinforcement Learning'
(Mnih et al., 2013) and 'Human-level control through deep reinforcement learning' (Mnih et al.,
2015).
"""
import gym
import numpy as np
import random
import time
from gym import wrappers
class EnvironmentWrapper:
"""Wraps over an OpenAI Gym environment and provides experience replay."""
def __init__(self,
env_name,
max_episode_length,
replay_memory_capacity,
action_space=None,
save_path=None):
"""Creates the wrapper.
Args:
env_name: Name of an OpenAI Gym environment.
max_episode_length: Maximum number of time steps per episode. When this number of time
steps is reached, the episode terminates early.
replay_memory_capacity: Number of experiences remembered. Conceptually, an experience is
a (state, action, reward, next_state, done) tuple. The replay memory is sampled by
the agent during training.
action_space: A list of possible actions. If 'action_space' is 'None' and no default
configuration exists for this environment, all actions will be allowed.
save_path: Path where to save experiments and videos.
"""
self.gym_env = gym.make(env_name)
if save_path:
self.gym_env = wrappers.Monitor(self.gym_env, save_path)
self.max_episode_length = max_episode_length
self.replay_memory_capacity = replay_memory_capacity
self.num_features = self.gym_env.observation_space.shape[0]
self.reset()
if action_space:
self.action_space = list(action_space)
else:
self.action_space = list(range(self.gym_env.action_space.n))
self.num_actions = len(self.action_space)
# Create replay memory. Arrays are used instead of double-ended queues for faster indexing.
self.num_exp = 0
self.actions = np.empty(replay_memory_capacity, np.uint8)
self.rewards = np.empty(replay_memory_capacity, np.int8)
self.ongoing = np.empty(replay_memory_capacity, np.bool)
# Used for computing both 'current' and 'next' states.
self.observations = np.empty([replay_memory_capacity + 1, self.num_features], np.float32)
# Initialize the first state.
self.observations[0], _, _, _ = self.gym_env.step(self.sample_action())
# Initialize the first experience by performing one more random action.
self.step(self.sample_action())
def reset(self):
"""Resets the environment."""
self.done = False
self.gym_env.reset()
self.episode_reward = 0
self.episode_length = 0
self.episode_start_time = time.time()
self.episode_run_time = 0
self.fps = 0
def step(self, action):
"""Performs the specified action.
Returns:
The reward.
Raises:
ValueError: If the action is not valid.
"""
if self.done:
self.reset()
if action not in self.action_space:
raise ValueError('Action "{}" is invalid. Valid actions: {}.'.format(action,
self.action_space))
observation, reward, self.done, _ = self.gym_env.step(action)
self.episode_reward += reward
self.episode_length += 1
self.episode_run_time = time.time() - self.episode_start_time
self.fps = 0 if self.episode_run_time == 0 else self.episode_length / self.episode_run_time
if self.episode_length == self.max_episode_length:
self.done = True
# Remember this experience.
self.actions[self.num_exp] = action
self.rewards[self.num_exp] = reward
self.ongoing[self.num_exp] = not self.done
self.observations[self.num_exp + 1] = observation
self.num_exp += 1
if self.num_exp == self.replay_memory_capacity:
# Free up space by deleting half of the oldest experiences.
mid = int(self.num_exp / 2)
end = 2 * mid
self.num_exp = mid
self.actions[:mid] = self.actions[mid:end]
self.rewards[:mid] = self.rewards[mid:end]
self.ongoing[:mid] = self.ongoing[mid:end]
self.observations[:mid + 1] = self.observations[mid:end + 1]
return reward
def render(self):
"""Draws the environment."""
self.gym_env.render()
def sample_action(self):
"""Samples a random action."""
return random.choice(self.action_space)
def sample_experiences(self, exp_count):
"""Randomly samples experiences from the replay memory. May contain duplicates.
Args:
exp_count: Number of experiences to sample.
Returns:
A (states, actions, rewards, next_states, ongoing) tuple. The boolean array, 'ongoing',
determines whether the 'next_states' are terminal states.
"""
indexes = np.random.choice(self.num_exp, exp_count)
actions = self.actions[indexes]
rewards = self.rewards[indexes]
ongoing = self.ongoing[indexes]
states = self.observations[indexes]
next_states = self.observations[indexes + 1]
return states, actions, rewards, next_states, ongoing
def get_state(self):
"""Gets the current state.
Returns:
A tensor with float32 values.
"""
return np.expand_dims(self.observations[self.num_exp], axis=0)