-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathoperator_illustrator.py
231 lines (201 loc) · 8.17 KB
/
operator_illustrator.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
import tensorflow as tf
import keras
import numpy as np
import matplotlib.pyplot as plt
from functools import reduce
from typing import Callable, List, Optional, Union
import argparse
from cmorl.utils.loss_composition import simple_p_mean, then, curriculum, p_mean
# NOTE: Remeber that the OR operator needs preturbation
# TODO: Implement the OR operator using DeMorgan's law
# TODO: Implement the AND operator
# TODO: Change the naming of the curriculum function to offset or prioritize
# TODO: remove the class structure and make everything a tf.function
# TODO: Case 1: Showcase how the OR operator functions on 4 variables
# TODO: Case 2: Showcase how the AND operator functions on 4 variables
# TODO: Case 3: Showcase how the offset operator functions on 4 variables
class RewardOptimizer:
def __init__(
self, num_variables: int = 4, learning_rate: float = 0.01, num_steps: int = 1000, competitiveness: float = 0.2, randomness: float = 0.01
):
"""
Initialize the reward optimization problem
Args:
num_variables: Number of variables to optimize (default: 4)
learning_rate: Learning rate for gradient descent (default: 0.01)
num_steps: Number of optimization steps (default: 1000)
"""
self.num_variables = num_variables
self.learning_rate = learning_rate
self.num_steps = num_steps
# Initialize variables to optimize
initial_values = np.ones(num_variables) * 0.01+np.random.rand(num_variables)*randomness
# initial_values[0] = 0.5
self.variables = tf.Variable(initial_values, dtype=tf.float32)
# Create optimizer
self.optimizer = keras.optimizers.SGD(learning_rate=learning_rate)
# Storage for plotting
self.o_history = []
self.reward_history = []
self.competitiveness = competitiveness
def compute_outputs(self) -> tf.Tensor:
"""
Compute the output values for each variable
Returns:
tf.Tensor: Tensor of output values following the competitive formula
"""
uncompeting_objectives = tf.tanh(tf.abs(self.variables))
others_means = (tf.reduce_sum(uncompeting_objectives)-uncompeting_objectives)/(self.num_variables-1)
outputs = uncompeting_objectives * (1 - others_means*self.competitiveness)
return outputs
def optimize(
self,
reward_composer: Callable[[List[tf.Tensor], float], tf.Tensor],
p_value: float = -2.0,
):
"""
Run the optimization process
Args:
reward_composer: Function that takes (outputs, p_value) and returns a reward
p_value: Parameter for p-mean composition (default: -2.0)
"""
for step in range(self.num_steps):
with tf.GradientTape() as tape:
# Calculate outputs
outputs = self.compute_outputs()
# Calculate reward using provided composer
reward = -reward_composer(outputs, p=p_value)
# Calculate and apply gradients
gradients = tape.gradient(reward, [self.variables])
self.optimizer.apply_gradients(zip(gradients, [self.variables]))
# Store values for plotting
self.o_history.append([float(o) for o in outputs])
self.reward_history.append(float(-reward))
def plot_results(self, p_value: float, save_path: Optional[str] = None):
"""
Plot the optimization results
Args:
p_value: P-value used in optimization (for plot title)
save_path: Optional path to save the plot (default: None)
"""
plt.figure(figsize=(12, 4))
# Plot outputs
plt.subplot(1, 2, 1)
for i in range(self.num_variables):
plt.plot([o[i] for o in self.o_history], label=f"o{i+1}")
plt.xlabel("Gradient Steps")
plt.ylabel("Output Values")
plt.title("Output Values vs. Gradient Steps")
plt.legend()
plt.grid(True)
# Plot reward
plt.subplot(1, 2, 2)
plt.plot(self.reward_history, label="Reward")
plt.xlabel("Gradient Steps")
plt.ylabel("Reward Value")
plt.title("Reward vs. Gradient Steps")
plt.legend()
plt.grid(True)
plt.tight_layout()
if save_path is None:
save_path = f"toy_ex_APS_P_mean_{p_value}_lr_{self.learning_rate}_steps_{self.num_steps}.png"
plt.savefig(save_path)
plt.show()
def main(
num_variables: int = 4,
learning_rate: float = 0.01,
num_steps: int = 1000,
p_value: float = -2.0,
slack: float = 0.1,
reward_type: str = "curriculum",
competitiveness: float = 0.2,
randomness: float = 0.01,
):
"""
Main function to run the optimization experiment
Args:
num_variables: Number of variables to optimize (default: 4)
learning_rate: Learning rate for gradient descent (default: 0.01)
num_steps: Number of optimization steps (default: 1000)
p_value: Parameter for p-mean composition (default: -2.0)
slack: Slack parameter for curriculum composer (default: 0.1)
reward_type: Type of reward composer to use ["curriculum", "pmean"] (default: "curriculum")
competitiveness: amount of competitiveness to use in the objectives (default: 0.2)
randomness: amount of randomness to use in the initial values (default: 0.01)
"""
# Create optimizer instance
optimizer = RewardOptimizer(
num_variables=num_variables, learning_rate=learning_rate, num_steps=num_steps, competitiveness=competitiveness, randomness=randomness
)
# Select reward composer based on type
if reward_type == "curriculum":
reward_composer = lambda outputs, p: curriculum(outputs, slack=slack, p=p)
save_path = f"curriculum_slack_{slack}_p_{p_value}_lr_{learning_rate}_steps_{num_steps}.png"
elif reward_type == "pmean":
reward_composer = lambda outputs, p: simple_p_mean(outputs, p)
save_path = f"pmean_p_{p_value}_lr_{learning_rate}_steps_{num_steps}.png"
elif reward_type == "pmean_stable":
reward_composer = lambda outputs, p: p_mean(outputs, p)
save_path = f"pmean_stable_p_{p_value}_lr_{learning_rate}_steps_{num_steps}.png"
else:
raise ValueError(f"Unknown reward type: {reward_type}")
# Run optimization
optimizer.optimize(reward_composer=reward_composer, p_value=p_value)
# Plot results
optimizer.plot_results(p_value=p_value, save_path=save_path)
if __name__ == "__main__":
# Parse command line arguments
parser = argparse.ArgumentParser(description="Run reward optimization experiment")
parser.add_argument(
"--num_variables", type=int, default=4, help="Number of variables to optimize"
)
parser.add_argument(
"--learning_rate",
type=float,
default=0.01,
help="Learning rate for gradient descent",
)
parser.add_argument(
"--num_steps", type=int, default=1000, help="Number of optimization steps"
)
parser.add_argument(
"--p_value", type=float, default=-2.0, help="Parameter for p-mean composition"
)
parser.add_argument(
"--slack",
type=float,
default=0.1,
help="Slack parameter for curriculum composer",
)
parser.add_argument(
"--reward_type",
type=str,
default="curriculum",
choices=["curriculum", "pmean", "pmean_stable"],
help="Type of reward composer to use",
)
parser.add_argument(
"--competitiveness",
type=float,
default=0.2,
help="Competitiveness parameter for curriculum composer",
)
parser.add_argument(
"--randomness",
type=float,
default=0.01,
help="Randomness parameter for curriculum composer",
)
args = parser.parse_args()
np.random.seed(42)
# Run main function with parsed arguments
main(
num_variables=args.num_variables,
learning_rate=args.learning_rate,
num_steps=args.num_steps,
p_value=args.p_value,
slack=args.slack,
reward_type=args.reward_type,
competitiveness=args.competitiveness,
randomness=args.randomness,
)