-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathdeep_q_network_actual.py
More file actions
173 lines (144 loc) · 6.14 KB
/
deep_q_network_actual.py
File metadata and controls
173 lines (144 loc) · 6.14 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
#!/usr/bin/env python
from __future__ import print_function
import tensorflow as tf
import cv2
import sys
sys.path.append("game/")
import wrapped_flappy_bird as game
import random
import numpy as np
from collections import deque
ACTIONS = 2 # number of valid actions
GAMMA = 0.99 # decay rate of past observations
OBSERVE = 100000. # timesteps to observe before training
EXPLORE = 2000000. # frames over which to anneal epsilon
FINAL_EPSILON = 0.0001 # final value of epsilon
INITIAL_EPSILON = 0.0001 # starting value of epsilon
REPLAY_MEMORY = 50000 # number of previous transitions to remember
BATCH = 32 # size of minibatch
FRAME_PER_ACTION = 1
def createNetwork():
# input layer
inputs = tf.keras.Input(shape=(80, 80, 4))
# hidden layers
x = tf.keras.layers.Conv2D(32, (8, 8), strides=4, activation='relu')(inputs)
x = tf.keras.layers.MaxPooling2D((2, 2))(x)
x = tf.keras.layers.Conv2D(64, (4, 4), strides=2, activation='relu')(x)
x = tf.keras.layers.Conv2D(64, (3, 3), strides=1, activation='relu')(x)
x = tf.keras.layers.Flatten()(x)
x = tf.keras.layers.Dense(512, activation='relu')(x)
outputs = tf.keras.layers.Dense(ACTIONS)(x)
model = tf.keras.Model(inputs=inputs, outputs=outputs)
return model
def trainNetwork(model):
# define the cost function
optimizer = tf.optimizers.Adam(learning_rate=1e-6)
# Define the training step
@tf.function
def train_step(y_batch, a_batch, s_j_batch):
with tf.GradientTape() as tape:
readout_action = tf.keras.layers.Lambda(lambda x: tf.reduce_sum(tf.multiply(model(x), x), axis=1))(a_batch)
cost = tf.keras.layers.Lambda(lambda x: tf.reduce_mean(tf.square(x[0] - x[1])))([y_batch, readout_action])
gradients = tape.gradient(cost, model.trainable_variables)
optimizer.apply_gradients(zip(gradients, model.trainable_variables))
return cost
# open up a game state to communicate with emulator
game_state = game.GameState()
# store the previous observations in replay memory
D = deque()
# printing & creating a summary writer
writer = tf.summary.create_file_writer("logs/")
# get the first state by doing nothing and preprocess the image to 80x80x4
do_nothing = np.zeros(ACTIONS)
do_nothing[0] = 1
x_t, r_0, terminal = game_state.frame_step(do_nothing)
x_t = cv2.cvtColor(cv2.resize(x_t, (80, 80)), cv2.COLOR_BGR2GRAY)
ret, x_t = cv2.threshold(x_t,1,255,cv2.THRESH_BINARY)
s_t = np.stack((x_t, x_t, x_t, x_t), axis=2)
# saving and loading networks
checkpoint = tf.train.Checkpoint(optimizer=optimizer, model=model)
checkpoint_manager = tf.train.CheckpointManager(checkpoint, directory="saved_networks", max_to_keep=3)
if checkpoint_manager.latest_checkpoint:
checkpoint.restore(checkpoint_manager.latest_checkpoint)
print("Successfully loaded:", checkpoint_manager.latest_checkpoint)
else:
print("Could not find old network weights")
# start training
epsilon = INITIAL_EPSILON
t = 0
while "flappy bird" != "angry bird":
# choose an action epsilon greedily
readout_t = model.predict(s_t.reshape((1, 80, 80, 4)))
a_t = np.zeros([ACTIONS])
action_index = 0
if t % FRAME_PER_ACTION == 0:
if random.random() <= epsilon:
print("----------Random Action----------")
action_index = random.randrange(ACTIONS)
a_t[random.randrange(ACTIONS)] = 1
else:
action_index = np.argmax(readout_t)
a_t[action_index] = 1
else:
a_t[0] = 1 # do nothing
# scale down epsilon
if epsilon > FINAL_EPSILON and t > OBSERVE:
epsilon -= (INITIAL_EPSILON - FINAL_EPSILON) / EXPLORE
# run the selected action and observe next state and reward
x_t1_colored, r_t, terminal = game_state.frame_step(a_t)
x_t1 = cv2.cvtColor(cv2.resize(x_t1_colored, (80, 80)), cv2.COLOR_BGR2GRAY)
ret, x_t1 = cv2.threshold(x_t1, 1, 255, cv2.THRESH_BINARY)
x_t1 = np.reshape(x_t1, (80, 80, 1))
s_t1 = np.append(x_t1, s_t[:, :, :3], axis=2)
# store the transition in D
D.append((s_t, a_t, r_t, s_t1, terminal))
if len(D) > REPLAY_MEMORY:
D.popleft()
# only train if done observing
if t > OBSERVE:
# sample a minibatch to train on
minibatch = random.sample(D, BATCH)
# get the batch variables
s_j_batch = [d[0] for d in minibatch]
a_batch = [d[1] for d in minibatch]
r_batch = [d[2] for d in minibatch]
s_j1_batch = [d[3] for d in minibatch]
y_batch = []
readout_j1_batch = model.predict(np.array(s_j1_batch).reshape(-1, 80, 80, 4))
for i in range(0, len(minibatch)):
terminal = minibatch[i][4]
# if terminal, only equals reward
if terminal:
y_batch.append(r_batch[i])
else:
y_batch.append(r_batch[i] + GAMMA * np.max(readout_j1_batch[i]))
# perform gradient step
cost_value = train_step(np.array(y_batch), np.array(a_batch), np.array(s_j_batch).reshape(-1, 80, 80, 4))
# Log the loss and Q-values to TensorBoard
with writer.as_default():
tf.summary.scalar("loss", cost_value, step=t)
tf.summary.scalar("q_value", np.max(readout_t), step=t)
# update the old values
s_t = s_t1
t += 1
# save progress every 10000 iterations
if t % 10000 == 0:
checkpoint_manager.save()
# print info
state = ""
if t <= OBSERVE:
state = "observe"
elif t > OBSERVE and t <= OBSERVE + EXPLORE:
state = "explore"
else:
state = "train"
print("TIMESTEP", t, "/ STATE", state, \
"/ EPSILON", epsilon, "/ ACTION", action_index, "/ REWARD", r_t, \
"/ Q_MAX %e" % np.max(readout_t))
def playGame():
model = createNetwork()
trainNetwork(model)
def main():
playGame()
if __name__ == "__main__":
main()