Commit 98b31daf by Ting PAN

add examples/GA3C

1 parent 72b4361c
# --------------------------------------------------------
# GA3C for Dragon
# Copyright(c) 2017 SeetaTech
# Written by Ting Pan
# --------------------------------------------------------
class Config(object):
#########################################################################
# Game configuration
# Name of the game, with version (e.g. PongDeterministic-v0)
ATARI_GAME = 'PongDeterministic-v0'
# Enable to see the trained agent in action
PLAY_MODE = False
# Enable to train
TRAIN_MODELS = True
# Load old models. Throws if the model doesn't exist
LOAD_CHECKPOINT = False
# If 0, the latest checkpoint is loaded
LOAD_EPISODE = 32000
#########################################################################
# Number of agents, predictors, trainers and other system settings
# If the dynamic configuration is on, these are the initial values.
# Number of Agents
AGENTS = 32
# Number of Predictors
PREDICTORS = 2
# Number of Trainers
TRAINERS = 2
# Device
DEVICE = 'gpu:0'
# Enable the dynamic adjustment (+ waiting time to start it)
DYNAMIC_SETTINGS = True
DYNAMIC_SETTINGS_STEP_WAIT = 20
DYNAMIC_SETTINGS_INITIAL_WAIT = 10
#########################################################################
# Algorithm parameters
# Discount factor
DISCOUNT = 0.99
# Tmax
TIME_MAX = 5
# Reward Clipping
REWARD_MIN = -1
REWARD_MAX = 1
# Max size of the queue
MAX_QUEUE_SIZE = 100
PREDICTION_BATCH_SIZE = 128
# Input of the DNN
STACKED_FRAMES = 4
IMAGE_WIDTH = 84
IMAGE_HEIGHT = 84
# Total number of episodes and annealing frequency
EPISODES = 400000
ANNEALING_EPISODE_COUNT = 400000
# Entropy regualrization hyper-parameter
BETA_START = 0.01
BETA_END = 0.01
# Learning rate
LEARNING_RATE_START = 0.0003
LEARNING_RATE_END = 0.0003
# RMSProp parameters
RMSPROP_DECAY = 0.99
RMSPROP_MOMENTUM = 0.0
RMSPROP_EPSILON = 0.1
# Dual RMSProp - we found that using a single RMSProp for the two cost function works better and faster
DUAL_RMSPROP = False
# Gradient clipping
USE_GRAD_CLIP = True
GRAD_CLIP_NORM = 40.0
# Epsilon (regularize policy lag in GA3C)
LOG_EPSILON = 1e-6
# Training min batch size - increasing the batch size increases the stability of the algorithm, but make learning slower
TRAINING_MIN_BATCH_SIZE = 0
#########################################################################
# Log and save
# Enable TensorBoard
TENSORBOARD = False
# Update TensorBoard every X training steps
TENSORBOARD_UPDATE_FREQUENCY = 1000
# Enable to save models every SAVE_FREQUENCY episodes
SAVE_MODELS = True
# Save every SAVE_FREQUENCY episodes
SAVE_FREQUENCY = 2000
# Print stats every PRINT_STATS_FREQUENCY episodes
PRINT_STATS_FREQUENCY = 1
# The window to average stats
STAT_ROLLING_MEAN_WINDOW = 1000
# Results filename
RESULTS_FILENAME = 'results.txt'
# Network checkpoint name
NETWORK_NAME = 'network'
#########################################################################
# More experimental parameters here
# Minimum policy
MIN_POLICY = 0.0
# Use log_softmax() instead of log(softmax())
USE_LOG_SOFTMAX = False
# --------------------------------------------------------
# GA3C for Dragon
# Copyright(c) 2017 SeetaTech
# Written by Ting Pan
# --------------------------------------------------------
import numpy as np
import scipy.misc as misc
from Config import Config
from GameManager import GameManager
class Queue(object):
def __init__(self, maxsize):
self._maxsize = maxsize
self.queue = []
def full(self):
return len(self.queue) >= self._maxsize
def get(self):
ret = self.queue[0]
self.queue = self.queue[1:]
return ret
def put(self, x):
self.queue.append(x)
def clear(self):
self.queue = []
class Environment:
def __init__(self):
self.game = GameManager(Config.ATARI_GAME, display=Config.PLAY_MODE)
self.nb_frames = Config.STACKED_FRAMES
self.frame_q = Queue(maxsize=self.nb_frames)
self.previous_state = None
self.current_state = None
self.total_reward = 0
self.reset()
@staticmethod
def _rgb2gray(rgb):
return np.dot(rgb[..., :3], [0.299, 0.587, 0.114])
@staticmethod
def _preprocess(image):
image = Environment._rgb2gray(image)
image = misc.imresize(image, [Config.IMAGE_HEIGHT, Config.IMAGE_WIDTH], 'bilinear')
image = image.astype(np.float32) / 128.0 - 1.0
return image
def _get_current_state(self):
if not self.frame_q.full():
return None # frame queue is not full yet.
x_ = np.array(self.frame_q.queue, dtype=np.float32)
return x_
def _update_frame_q(self, frame):
if self.frame_q.full():
self.frame_q.get()
image = Environment._preprocess(frame)
self.frame_q.put(image)
def get_num_actions(self):
return self.game.env.action_space.n
def reset(self):
self.total_reward = 0
self.frame_q.clear()
self._update_frame_q(self.game.reset())
self.previous_state = self.current_state = None
def step(self, action):
observation, reward, done, _ = self.game.step(action)
self.total_reward += reward
self._update_frame_q(observation)
self.previous_state = self.current_state
self.current_state = self._get_current_state()
return reward, done
\ No newline at end of file
# --------------------------------------------------------
# GA3C for Dragon
# Copyright(c) 2017 SeetaTech
# Written by Ting Pan
# --------------------------------------------------------
from Config import Config
from Server import Server
# Adjust configs for Play mode
if Config.PLAY_MODE:
Config.AGENTS = 1
Config.PREDICTORS = 1
Config.TRAINERS = 1
Config.DYNAMIC_SETTINGS = False
Config.LOAD_CHECKPOINT = True
Config.TRAIN_MODELS = False
Config.SAVE_MODELS = False
if __name__ == "__main__":
import dragon.config
dragon.config.EnableCUDA()
Server().main()
# --------------------------------------------------------
# GA3C for Dragon
# Copyright(c) 2017 SeetaTech
# Written by Ting Pan
# --------------------------------------------------------
import gym
class GameManager:
def __init__(self, game_name, display):
self.game_name = game_name
self.display = display
self.env = gym.make(game_name)
self.reset()
def reset(self):
observation = self.env.reset()
return observation
def step(self, action):
self._update_display()
observation, reward, done, info = self.env.step(action)
return observation, reward, done, info
def _update_display(self):
if self.display:
self.env.render()
# --------------------------------------------------------
# GA3C for Dragon
# Copyright(c) 2017 SeetaTech
# Written by Ting Pan
# --------------------------------------------------------
import threading
from dragon.core.tensor import Tensor
import dragon.vm.theano as theano
import dragon.vm.theano.tensor as T
import dragon.ops as ops
import dragon.core.workspace as ws
import dragon.updaters as updaters
from Config import Config
mutex = threading.Lock()
class NetworkVP:
def __init__(self, model_name, num_actions):
self.model_name = model_name
self.num_actions = num_actions
self.network_params = []
self.img_width = Config.IMAGE_WIDTH
self.img_height = Config.IMAGE_HEIGHT
self.img_channels = Config.STACKED_FRAMES
self.learning_rate = Config.LEARNING_RATE_START
self.beta = Config.BETA_START
self.log_epsilon = Config.LOG_EPSILON
self._create_graph()
if Config.PLAY_MODE:
ws
def _create_graph(self):
self.x = Tensor(shape=[None, self.img_channels, self.img_height, self.img_width]).Variable()
self.y_r = Tensor(shape=[None], name='Yr').Variable()
# As implemented in A3C paper
self.n1 = ops.Relu(ops.Conv2D([self.x] + self.weight_bias(), kernel_size=8, stride=4, num_output=16))
self.n2 = ops.Relu(ops.Conv2D([self.n1] + self.weight_bias(), kernel_size=4, stride=2, num_output=32))
self.action_index = Tensor(shape=[None, self.num_actions]).Variable()
self.d1 = ops.Relu(ops.InnerProduct([self.n2] + self.weight_bias(), num_output=256))
self.logits_v = ops.InnerProduct([self.d1] + self.weight_bias(), num_output=1)
self.cost_v = ops.L2Loss([self.y_r, self.logits_v])
self.logits_p = ops.InnerProduct([self.d1] + self.weight_bias(), num_output=self.num_actions)
if Config.USE_LOG_SOFTMAX: raise NotImplementedError()
else:
self.softmax_p = ops.Softmax(self.logits_p)
self.selected_action_prob = ops.Sum(self.softmax_p * self.action_index, axis=1)
self.cost_p_1 = ops.Log(ops.Clip(self.selected_action_prob, self.log_epsilon, None)) * \
(self.y_r - ops.StopGradient(self.logits_v))
self.cost_p_2 = ops.Sum(ops.Log(ops.Clip(self.softmax_p, self.log_epsilon, None)) *
self.softmax_p, axis=1) * (-self.beta)
self.cost_p_1_agg = ops.Sum(self.cost_p_1)
self.cost_p_2_agg = ops.Sum(self.cost_p_2)
self.cost_p = -(self.cost_p_1_agg + self.cost_p_2_agg)
self.cost_all = self.cost_p + self.cost_v
if Config.DUAL_RMSPROP: raise NotImplementedError()
else:
if Config.USE_GRAD_CLIP:
self.opt = updaters.RMSPropUpdater(decay=Config.RMSPROP_DECAY,
eps=Config.RMSPROP_EPSILON,
clip_gradient=Config.GRAD_CLIP_NORM)
else:
self.opt = updaters.RMSPropUpdater(decay=Config.RMSPROP_DECAY,
eps=Config.RMSPROP_EPSILON)
grads = T.grad(self.cost_all, self.network_params)
for p, g in zip(self.network_params, grads):
self.opt.append((p, g), lr_mult=1.0)
def weight_bias(self, weights_init=None, no_bias=False):
if weights_init is None:
weight = Tensor().Xavier()
else:
weight = weights_init
if no_bias:
self.network_params.extend([weight])
return [weight]
bias = Tensor().Constant(value=0)
self.network_params.extend([weight, bias])
return [weight, bias]
def predict_single(self, x):
return self.predict_p(x[None, :])[0]
def predict_v(self, x):
if not hasattr(self, '_predict_p'):
self._predict_v = theano.function(inputs=self.x, outputs=self.logits_v)
return self._predict_v(x)
def predict_p(self, x):
if not hasattr(self, '_predict_p'):
self._predict_p = theano.function(inputs=self.x, outputs=self.softmax_p)
return self._predict_p(x)
def predict_p_and_v(self, x):
if not hasattr(self, '_predict_p_and_v'):
self._predict_p_and_v = theano.function(inputs=self.x, outputs=[self.softmax_p, self.logits_v])
global mutex
mutex.acquire()
p, v = self._predict_p_and_v(x)
mutex.release()
return p, v
def train(self, x, y_r, a):
if not hasattr(self, '_train'):
self._compute = theano.function(inputs=[self.x, self.y_r, self.action_index],
outputs=self.cost_all)
self._train = theano.function(updater=self.opt)
global mutex
mutex.acquire()
loss = self._compute(x, y_r, a)
mutex.release()
self._train()
return loss
def save(self, episode):
filename = 'checkpoints/%s_%08d' % (self.model_name, episode)
ws.Snapshot(self.network_params, filename)
def load(self):
filename = 'checkpoints/%s_%08d.bin' % (self.model_name, Config.LOAD_EPISODE)
ws.Restore(filename)
return Config.LOAD_EPISODE
\ No newline at end of file
# --------------------------------------------------------
# GA3C for Dragon
# Copyright(c) 2017 SeetaTech
# Written by Ting Pan
# --------------------------------------------------------
from datetime import datetime
from multiprocessing import Process, Queue, Value
import numpy as np
import time
from Config import Config
from Environment import Environment
class Experience(object):
def __init__(self, state, action, prob, reward, done):
self.state = state
self.action = action
self.prob = prob
self.reward = reward
self.done = done
class ProcessAgent(Process):
def __init__(self, id, prediction_q, training_q, episode_log_q):
super(ProcessAgent, self).__init__()
self.id = id
self.prediction_q = prediction_q
self.training_q = training_q
self.episode_log_q = episode_log_q
self.env = Environment()
self.num_actions = self.env.get_num_actions()
self.actions = np.arange(self.num_actions)
self.discount_factor = Config.DISCOUNT
# one frame at a time
self.wait_q = Queue(maxsize=1)
self.exit_flag = Value('i', 0)
@staticmethod
def _accumulate_rewards(experiences, discount_factor, terminal_reward):
reward_sum = terminal_reward
for t in reversed(range(0, len(experiences) - 1)):
r = np.clip(experiences[t].reward, Config.REWARD_MIN, Config.REWARD_MAX)
reward_sum = discount_factor * reward_sum + r
experiences[t].reward = reward_sum
return experiences[:-1]
def convert_data(self, experiences):
x_ = np.array([exp.state for exp in experiences])
a_ = np.eye(self.num_actions)[np.array([exp.action for exp in experiences])].astype(np.float32)
r_ = np.array([exp.reward for exp in experiences], dtype=np.float32) # R
r_ = r_.reshape((-1, 1))
return x_, r_, a_
def predict(self, state):
# put the state in the prediction q
self.prediction_q.put((self.id, state))
# wait for the prediction to come back
p, v = self.wait_q.get()
return p, v
def select_action(self, prediction):
if Config.PLAY_MODE:
action = np.argmax(prediction)
else:
action = np.random.choice(self.actions, p=prediction)
return action
def run_episode(self):
self.env.reset()
done = False
experiences = []
time_count = 0
reward_sum = 0.0
while not done:
# very first few frames
if self.env.current_state is None:
self.env.step(0) # 0 == NOOP
continue
prediction, value = self.predict(self.env.current_state)
action = self.select_action(prediction)
reward, done = self.env.step(action)
reward_sum += reward
exp = Experience(self.env.previous_state, action, prediction, reward, done)
experiences.append(exp)
if done or time_count == Config.TIME_MAX:
terminal_reward = 0 if done else value
updated_exps = ProcessAgent._accumulate_rewards(experiences, self.discount_factor, terminal_reward)
x_, r_, a_ = self.convert_data(updated_exps)
yield x_, r_, a_, reward_sum
# reset the tmax count
time_count = 0
# keep the last experience for the next batch
experiences = [experiences[-1]]
reward_sum = 0.0
time_count += 1
def run(self):
# randomly sleep up to 1 second. helps agents boot smoothly.
time.sleep(np.random.rand())
np.random.seed(np.int32(time.time() % 1 * 1000 + self.id * 10))
while self.exit_flag.value == 0:
total_reward = 0
total_length = 0
for x_, r_, a_, reward_sum in self.run_episode():
total_reward += reward_sum
total_length += len(r_) + 1 # +1 for last frame that we drop
self.training_q.put((x_, r_, a_))
self.episode_log_q.put((datetime.now(), total_reward, total_length))
# --------------------------------------------------------
# GA3C for Dragon
# Copyright(c) 2017 SeetaTech
# Written by Ting Pan
# --------------------------------------------------------
import sys
if sys.version_info >= (3, 0):
from queue import Queue as queueQueue
else:
from Queue import Queue as queueQueue
from datetime import datetime
from multiprocessing import Process, Queue, Value
import numpy as np
import time
from Config import Config
class ProcessStats(Process):
def __init__(self):
super(ProcessStats, self).__init__()
self.episode_log_q = Queue(maxsize=100)
self.episode_count = Value('i', 0)
self.training_count = Value('i', 0)
self.should_save_model = Value('i', 0)
self.trainer_count = Value('i', 0)
self.predictor_count = Value('i', 0)
self.agent_count = Value('i', 0)
self.total_frame_count = 0
def FPS(self):
# average FPS from the beginning of the training (not current FPS)
return np.ceil(self.total_frame_count / (time.time() - self.start_time))
def TPS(self):
# average TPS from the beginning of the training (not current TPS)
return np.ceil(self.training_count.value / (time.time() - self.start_time))
def run(self):
with open(Config.RESULTS_FILENAME, 'a') as results_logger:
rolling_frame_count = 0
rolling_reward = 0
results_q = queueQueue(maxsize=Config.STAT_ROLLING_MEAN_WINDOW)
self.start_time = time.time()
first_time = datetime.now()
while True:
episode_time, reward, length = self.episode_log_q.get()
results_logger.write('%s, %d, %d\n' % (episode_time.strftime("%Y-%m-%d %H:%M:%S"), reward, length))
results_logger.flush()
self.total_frame_count += length
self.episode_count.value += 1
rolling_frame_count += length
rolling_reward += reward
if results_q.full():
old_episode_time, old_reward, old_length = results_q.get()
rolling_frame_count -= old_length
rolling_reward -= old_reward
first_time = old_episode_time
results_q.put((episode_time, reward, length))
if self.episode_count.value % Config.SAVE_FREQUENCY == 0:
self.should_save_model.value = 1
if self.episode_count.value % Config.PRINT_STATS_FREQUENCY == 0:
print(
'[Time: %8d] '
'[Episode: %8d Score: %10.4f] '
'[RScore: %10.4f RPPS: %5d] '
'[PPS: %5d TPS: %5d] '
'[NT: %2d NP: %2d NA: %2d]'
% (int(time.time() - self.start_time),
self.episode_count.value, reward,
rolling_reward / results_q.qsize(),
rolling_frame_count / (datetime.now() - first_time).total_seconds(),
self.FPS(), self.TPS(),
self.trainer_count.value, self.predictor_count.value, self.agent_count.value))
sys.stdout.flush()
# GA3C [TinyDragon Style]
This is a Dragon implementation of GA3C comparing to [NVlabs](https://github.com/NVlabs/GA3C) based on TensorFlow.
GA3C is a hybrid CPU/GPU version of the Asynchronous Advantage Actor-Critic (A3C) algorithm.
Currently the state-of-the-art method in reinforcement learning for various gaming tasks.
This CPU/GPU implementation, based on Dragon, achieves a significant speed up compared to a similar CPU implementation.
**```Attention```**: GA3C does not support **Windows**, the hybrid Thread/Process will trigger a deadlock if trainers or predictors >=2.
## How do I get set up? ###
* Install [Dragon](https://github.com/neopenx/Dragon)
* Install [OpenAI Gym](https://github.com/openai/gym)
## How to Train a model from scratch? ###
Run GA3C.
You can modify the training parameters directly in `Config.py`.
The output should look like below:
...
[Time: 33] [Episode: 26 Score: -19.0000] [RScore: -20.5000 RPPS: 822] [PPS: 823 TPS: 183] [NT: 2 NP: 2 NA: 32]
[Time: 33] [Episode: 27 Score: -20.0000] [RScore: -20.4815 RPPS: 855] [PPS: 856 TPS: 183] [NT: 2 NP: 2 NA: 32]
[Time: 35] [Episode: 28 Score: -20.0000] [RScore: -20.4643 RPPS: 854] [PPS: 855 TPS: 185] [NT: 2 NP: 2 NA: 32]
[Time: 35] [Episode: 29 Score: -19.0000] [RScore: -20.4138 RPPS: 877] [PPS: 878 TPS: 185] [NT: 2 NP: 2 NA: 32]
[Time: 36] [Episode: 30 Score: -20.0000] [RScore: -20.4000 RPPS: 899] [PPS: 900 TPS: 186] [NT: 2 NP: 2 NA: 32]
...
**PPS** (predictions per second) demonstrates the speed of processing frames, while **Score** shows the achieved score.
**RPPS** and **RScore** are the rolling average of the above values.
To stop the training procedure, adjuts `EPISODES` in `Config.py` propoerly, or simply use ctrl + c.
## How to continue training a model? ###
If you want to continue training a model, set `LOAD_CHECKPOINTS=True` in `Config.py`.
Set `LOAD_EPISODE` to the episode number you want to load.
Be sure that the corresponding model has been saved in the checkpoints folder (the model name includes the number of the episode).
## How to play a game with a trained agent? ###
set `PLAY_MODE=True` and set `LOAD_EPISODE=xxxx` in `Config.py`
Run GA3C.py
## How to change the game, configurations, etc.? ###
All the configurations are in `Config.py`
## Sample learning curves
Typical learning curves for Pong and Boxing are shown here. These are easily obtained from the results.txt file.
![Convergence Curves](http://mb2.web.engr.illinois.edu/images/pong_boxing.png)
### References ###
If you use this code, please refer to [ICLR 2017 paper](https://openreview.net/forum?id=r1VGvBcxl):
```
@conference{babaeizadeh2017ga3c,
title={Reinforcement Learning thorugh Asynchronous Advantage Actor-Critic on a GPU},
author={Babaeizadeh, Mohammad and Frosio, Iuri and Tyree, Stephen and Clemons, Jason and Kautz, Jan},
booktitle={ICLR},
biurl={https://openreview.net/forum?id=r1VGvBcxl},
year={2017}
}
```
This work was first presented in an oral talk at the [The 1st International Workshop on Efficient Methods for Deep Neural Networks](http://allenai.org/plato/emdnn/papers.html), NIPS Workshop, Barcelona (Spain), Dec. 9, 2016:
```
@article{babaeizadeh2016ga3c,
title={{GA3C:} {GPU}-based {A3C} for Deep Reinforcement Learning},
author={Babaeizadeh, Mohammad and Frosio, Iuri and Tyree, Stephen and Clemons, Jason and Kautz, Jan},
journal={NIPS Workshop},
biurl={arXiv preprint arXiv:1611.06256},
year={2016}
}
```
# --------------------------------------------------------
# GA3C for Dragon
# Copyright(c) 2017 SeetaTech
# Written by Ting Pan
# --------------------------------------------------------
from multiprocessing import Queue
import time
from Config import Config
from Environment import Environment
from NetworkVP import NetworkVP
from ProcessAgent import ProcessAgent
from ProcessStats import ProcessStats
from ThreadDynamicAdjustment import ThreadDynamicAdjustment
from ThreadPredictor import ThreadPredictor
from ThreadTrainer import ThreadTrainer
class Server(object):
def __init__(self):
self.stats = ProcessStats()
self.training_q = Queue(maxsize=Config.MAX_QUEUE_SIZE)
self.prediction_q = Queue(maxsize=Config.MAX_QUEUE_SIZE)
self.model = NetworkVP(Config.NETWORK_NAME, Environment().get_num_actions())
if Config.LOAD_CHECKPOINT:
self.stats.episode_count.value = self.model.load()
self.training_step = 0
self.frame_counter = 0
self.agents = []
self.predictors = []
self.trainers = []
self.dynamic_adjustment = ThreadDynamicAdjustment(self)
def add_agent(self):
self.agents.append(
ProcessAgent(len(self.agents), self.prediction_q, self.training_q, self.stats.episode_log_q))
self.agents[-1].start()
def remove_agent(self):
self.agents[-1].exit_flag.value = True
self.agents[-1].join()
self.agents.pop()
def add_predictor(self):
self.predictors.append(ThreadPredictor(self, len(self.predictors)))
self.predictors[-1].start()
def remove_predictor(self):
self.predictors[-1].exit_flag = True
self.predictors[-1].join()
self.predictors.pop()
def add_trainer(self):
self.trainers.append(ThreadTrainer(self, len(self.trainers)))
self.trainers[-1].start()
def remove_trainer(self):
self.trainers[-1].exit_flag = True
self.trainers[-1].join()
self.trainers.pop()
def train_model(self, x_, r_, a_):
self.model.train(x_, r_, a_)
self.training_step += 1
self.frame_counter += x_.shape[0]
self.stats.training_count.value += 1
self.dynamic_adjustment.temporal_training_count += 1
if Config.TENSORBOARD and self.stats.training_count.value % Config.TENSORBOARD_UPDATE_FREQUENCY == 0:
self.model.log(x_, r_, a_)
def save_model(self):
self.model.save(self.stats.episode_count.value)
def main(self):
self.stats.start()
self.dynamic_adjustment.start()
if Config.PLAY_MODE:
for trainer in self.trainers:
trainer.enabled = False
learning_rate_multiplier = (Config.LEARNING_RATE_END - Config.LEARNING_RATE_START) / Config.ANNEALING_EPISODE_COUNT
while self.stats.episode_count.value < Config.EPISODES:
step = min(self.stats.episode_count.value, Config.ANNEALING_EPISODE_COUNT - 1)
self.model.opt.lr = Config.LEARNING_RATE_START + learning_rate_multiplier * step
# Saving is async - even if we start saving at a given episode, we may save the model at a later episode
if Config.SAVE_MODELS and self.stats.should_save_model.value > 0:
self.save_model()
self.stats.should_save_model.value = 0
time.sleep(0.01)
self.dynamic_adjustment.exit_flag = True
while self.agents:
self.remove_agent()
while self.predictors:
self.remove_predictor()
while self.trainers:
self.remove_trainer()
# --------------------------------------------------------
# GA3C for Dragon
# Copyright(c) 2017 SeetaTech
# Written by Ting Pan
# --------------------------------------------------------
from threading import Thread
import numpy as np
import time
from Config import Config
class ThreadDynamicAdjustment(Thread):
def __init__(self, server):
super(ThreadDynamicAdjustment, self).__init__()
self.setDaemon(True)
self.server = server
self.enabled = Config.DYNAMIC_SETTINGS
self.trainer_count = Config.TRAINERS
self.predictor_count = Config.PREDICTORS
self.agent_count = Config.AGENTS
self.temporal_training_count = 0
self.exit_flag = False
def enable_disable_components(self):
cur_len = len(self.server.trainers)
if cur_len < self.trainer_count:
for _ in np.arange(cur_len, self.trainer_count):
self.server.add_trainer()
elif cur_len > self.trainer_count:
for _ in np.arange(self.trainer_count, cur_len):
self.server.remove_trainer()
cur_len = len(self.server.predictors)
if cur_len < self.predictor_count:
for _ in np.arange(cur_len, self.predictor_count):
self.server.add_predictor()
elif cur_len > self.predictor_count:
for _ in np.arange(self.predictor_count, cur_len):
self.server.remove_predictor()
cur_len = len(self.server.agents)
if cur_len < self.agent_count:
for _ in np.arange(cur_len, self.agent_count):
self.server.add_agent()
elif cur_len > self.agent_count:
for _ in np.arange(self.agent_count, cur_len):
self.server.remove_agent()
def random_walk(self):
# 3 directions, 1 for Trainers, 1 for Predictors and 1 for Agents
# 3 outcome for each, -1: add one, 0: no change, 1: remove one
direction = np.random.randint(3, size=3) - 1
self.trainer_count = max(1, self.trainer_count - direction[0])
self.predictor_count = max(1, self.predictor_count - direction[1])
self.agent_count = max(1, self.agent_count - direction[2])
def update_stats(self):
self.server.stats.trainer_count.value = self.trainer_count
self.server.stats.predictor_count.value = self.predictor_count
self.server.stats.agent_count.value = self.agent_count
def run(self):
self.enable_disable_components()
self.update_stats()
if not self.enabled: return
# Wait for initialization
time.sleep(Config.DYNAMIC_SETTINGS_INITIAL_WAIT)
while not self.exit_flag:
old_trainer_count, old_predictor_count, old_agent_count = \
self.trainer_count, self.predictor_count, self.agent_count
self.random_walk()
# If no change, do nothing
if self.trainer_count == old_trainer_count \
and self.predictor_count == old_predictor_count \
and self.agent_count == old_agent_count:
continue
old_count = self.temporal_training_count
self.enable_disable_components()
self.temporal_training_count = 0
time.sleep(Config.DYNAMIC_SETTINGS_STEP_WAIT)
cur_count = self.temporal_training_count
# if it didn't work, revert the changes
if cur_count < old_count:
self.trainer_count, self.predictor_count, self.agent_count = \
old_trainer_count, old_predictor_count, old_agent_count
self.update_stats()
\ No newline at end of file
# --------------------------------------------------------
# GA3C for Dragon
# Copyright(c) 2017 SeetaTech
# Written by Ting Pan
# --------------------------------------------------------
from threading import Thread
import numpy as np
from Config import Config
class ThreadPredictor(Thread):
def __init__(self, server, id):
super(ThreadPredictor, self).__init__()
self.setDaemon(True)
self.id = id
self.server = server
self.exit_flag = False
def run(self):
ids = np.zeros(Config.PREDICTION_BATCH_SIZE, dtype=np.uint16)
states = np.zeros(
(Config.PREDICTION_BATCH_SIZE, Config.STACKED_FRAMES, Config.IMAGE_HEIGHT, Config.IMAGE_WIDTH, ),
dtype=np.float32)
while not self.exit_flag:
ids[0], states[0] = self.server.prediction_q.get()
size = 1
while size < Config.PREDICTION_BATCH_SIZE and not self.server.prediction_q.empty():
ids[size], states[size] = self.server.prediction_q.get()
size += 1
batch = states[:size]
p, v = self.server.model.predict_p_and_v(batch)
for i in range(size):
if ids[i] < len(self.server.agents):
self.server.agents[ids[i]].wait_q.put((p[i], v[i]))
# --------------------------------------------------------
# GA3C for Dragon
# Copyright(c) 2017 SeetaTech
# Written by Ting Pan
# --------------------------------------------------------
from threading import Thread
import numpy as np
from Config import Config
class ThreadTrainer(Thread):
def __init__(self, server, id):
super(ThreadTrainer, self).__init__()
self.setDaemon(True)
self.id = id
self.server = server
self.exit_flag = False
def run(self):
while not self.exit_flag:
batch_size = 0
while batch_size <= Config.TRAINING_MIN_BATCH_SIZE:
x_, r_, a_ = self.server.training_q.get()
if batch_size == 0:
x__ = x_
r__ = r_
a__ = a_
else:
x__ = np.concatenate((x__, x_))
r__ = np.concatenate((r__, r_))
a__ = np.concatenate((a__, a_))
batch_size += x_.shape[0]
if Config.TRAIN_MODELS:
self.server.train_model(x__, r__, a__)
\ No newline at end of file
......@@ -8,4 +8,6 @@ which was described in our arXiv paper: [Dragon: A Computation Graph Virtual Mac
## <a name="list-of-examples"></a>List of examples
* [cifar10](https://github.com/neopenx/Dragon/tree/master/examples/cifar10) - How to train/infer a basic classification network [**Caffe1** Style]
* [cifar10](https://github.com/neopenx/Dragon/tree/master/examples/cifar10) - How to train/infer a basic classification network [*Caffe1 Style*]
* [GA3C](https://github.com/neopenx/Dragon/tree/master/examples/GA3C) - A hybrid CPU/GPU version of the A3C algorithm [*TinyDragon Style*]
Markdown is supported
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!