オセロをランダムからChainerを使ったAlphaZeroもどきへと進化させながら実装してみた

これはChainer/CuPy Advent Calendar 2018の25日目です。パーソルキャリア Advent Calendar 2018の25日目でもあります。

クリスマスにオセロしてくれる相手が欲しかったので簡単なオセロゲームを実装してみました。

今回のコード

下記、タグv0.0.0になります。ご査収くださいませ。*1

github.com

開発はMacで行いましたがDockerで動くのでおそらく他のOSでも動作すると思います。 Docker imageはchainer/chainerv5.1.0-python3です。 調べた感じだと各ライブラリのバージョンは下記のようです。

遊び方

何はともあれgit cloneします。

$ git clone -b v0.0.0 https://github.com/nihemak/heta-reversi.git
$ cd heta-reversi

あとはREADMEの通りですが、最初にDocker環境を立ち上げて中に入ります。この環境で遊ぶことになります。

$ docker-compose up -d --build
$ docker-compose exec python3 bash

ランダムと遊ぶ場合は下記です。

$ export PYTHONIOENCODING=utf-8; python3 main.py play-random

f:id:nihma:20181224123823p:plain f:id:nihma:20181224123844p:plain

原始モンテカルロと遊ぶ場合は下記です。

$ export PYTHONIOENCODING=utf-8; python3 main.py play-primitive-monte-carlo

モンテカルロ木探索(Monte Carlo Tree Search。以下、MCTS)と遊ぶ場合は下記です。

$ export PYTHONIOENCODING=utf-8; python3 main.py play-mcts

SLポリシーネットワークとAPV-MCTS(Asynchronous Policy and Value Monte Carlo Tree Search)はデュアルネットワークのモデルが必要ですが初期状態(トレーニングなし)のモデルでも遊ぶことができます。

SLポリシーネットワーク(初期状態のモデル)と遊ぶ場合は下記です。

$ export PYTHONIOENCODING=utf-8; python3 main.py play-sl-policy-network-random

APV-MCTS(初期状態のモデル)と遊ぶ場合は下記です。

$ export PYTHONIOENCODING=utf-8; python3 main.py play-apv-mcts-random

APV-MCTSのセルフプレイによる強化学習によるトレーニング済みのデュアルネットワークのモデルを作成する場合は下記です。完了するとモデルデータファイル(data/model_xxxxxxxx.dat)が生成されます。*2

$ export PYTHONIOENCODING=utf-8; python3 main.py create-model

レーニング済みのデュアルネットワークのモデルを指定してさらにトレーニングすることもできます。

$ export PYTHONIOENCODING=utf-8; python3 main.py create-model data/model_xxxxxxxx.dat

SLポリシーネットワーク(トレーニング済みのモデル)と遊ぶ場合は下記です。

$ export PYTHONIOENCODING=utf-8; python3 main.py play-sl-policy-network data/model_xxxxxxxx.dat

APV-MCTS(トレーニング済みのモデル)と遊ぶ場合は下記です。

$ export PYTHONIOENCODING=utf-8; python3 main.py play-sl-policy-network data/model_xxxxxxxx.dat

遊んだ時に得られる対戦ログ(data/playdata_xxxxxxxx.dat)でリプレイを観賞することもできます。

$ export PYTHONIOENCODING=utf-8; python3 main.py replay data/playdata_xxxxxxxx.dat

遊び終わって気が済んだらDocker環境を停止します。

$ exit
$ docker-compose down

実装概要

詳細はGitHubにコードがありますのでここでは概要だけ記していきます。*3

オセロ

オセロ盤は一次元配列で表現し1を黒石、-1を白石としました。

main.py: L15-L19

def get_init_board():
    board = np.array([0] * 64, dtype=np.float32)
    board[28] = board[35] = 1
    board[27] = board[36] = -1
    return board

石を置ける場所は自分の石で相手の石を挟める場所なので単純に下記が成り立つ場所を探すようにしました。

  • 石が置かれてない
  • 隣(上下左右斜め)に相手の石が続いて存在しその隣に自分の石がある

main.py: L21-L61

def get_flip_positions_by_row_column(board, is_black, row, column, row_add, column_add):
    position_nums = []

    own, pair = (1, -1) if is_black else (-1, 1)

    row    += row_add
    column += column_add
    exists = False
    while row >= 0 and row < 8 and column >= 0 and column < 8:
        position_num = row * 8 + column

        if exists == True and board[position_num] == own:
            break
        if board[position_num] != pair:
            position_nums = []
            break

        position_nums.append(position_num)

        exists = True
        row    += row_add
        column += column_add

    return position_nums

def get_flip_positions(board, is_black, position_num):
    position_nums = []

    if not (position_num >= 0 and position_num <= 63):
        return position_nums
    if board[position_num] != 0:
        return position_nums

    column = position_num % 8
    row    = int((position_num - column) / 8)

    row_column_adds = ((0,1),(0,-1),(1,0),(-1,0),(1,1),(1,-1),(-1,1),(-1,-1))
    for row_add, column_add in row_column_adds:
        position_nums.extend(get_flip_positions_by_row_column(board, is_black, row, column, row_add, column_add))

    return position_nums

そして各アルゴリズムごとに実装した、上記の結果を使って次の一手を決めるchoice_xxx関数を2つ受け取りゲームを進めるようにしました。各アルゴリズムごとのchoice_xxx関数の実装は後述します。

main.py: L574-L602

def game(choice_black, choice_white, board = None, is_render = True, limit_step_num = None):
    steps = []
    if board is None:
        board = get_init_board()
    player = get_player(board)
    step_num = 0
    while True:
        if limit_step_num is not None and step_num >= limit_step_num:
            break
        board, is_black, _ = player
        if is_render:
            render_board(player)
        if is_end_game(steps, player):
            break
        position_num = None
        if is_putable(player):
            choice = choice_black if is_black else choice_white
            choice_data = choice(player)
            position_num = choice_data["position_num"]
            if is_render:
                print(position_num)
            board = put(player, position_num)
        else:
            if is_render:
                print("pass")
        steps.append((player, choice_data))
        player = get_player(board, not is_black)
        step_num += 1
    return steps

後述のモンテカルロのプレイアウトやAPV-MCTSのセルフプレイでも使うためやや複雑になってますが基本は終局までゲームを進めているだけです。

人間

人間による次の一手の選択は入力を受け取るだけです。

main.py: L558-L572

def choice_human(player):
    _, _, putable_position_nums = player
    choice = None
    while True:
        try:
            choice = input("Please enter number in {}: ".format(putable_position_nums))
            choice = int(choice)
            if choice in putable_position_nums:
                break
            else:
                print("{} is invalid".format(choice))
        except Exception:
            print("{} is invalid".format(choice))
    choice_data = { 'position_num': choice }
    return choice_data

基本的に人間と他のchoice_xxx関数で対戦するのでそれ用の関数も用意しました。

main.py: L630-L655

def play(choice_computer):
    choice1 = {
        'name': "You",
        'choice': choice_human
    }
    choice2 = {
        'name': "Computer",
        'choice': choice_computer
    }
    while True:
        print("start: {} vs {}".format(choice1['name'], choice2['name']))

        steps = game(choice1['choice'], choice2['choice'])

        is_black_win = is_win_game(steps, is_black = True)
        is_white_win = is_win_game(steps, is_black = False)
        if is_black_win:
            print("{} is win".format(choice1['name']))
        elif is_white_win:
            print("{} is win".format(choice2['name']))
        else:
            print("draw")
        save_playdata(steps)
        if not yes_no_input("Do you want to continue? [y/N]: "):
            break
        choice1, choice2 = choice2, choice1

ランダム

ランダムは文字通りランダムに次の一手を選択するだけです。

main.py: L216-L219

def choice_random(player):
    _, _, putable_position_nums = player
    choice_data = { 'position_num': np.random.choice(putable_position_nums) }
    return choice_data

コマンドラインで使えるようにしています。

main.py: L666-L667

    if len(args) > 1 and args[1] == 'play-random':
        play(choice_random)

ランダム選択のchoice_xxx関数は後述のモンテカルロのプレイアウトでも活躍します。

原始モンテカルロ

原始モンテカルロは選択可能な全て手に対してプレイアウトを行い勝率の一番高い手を次の一手として選択します。

main.py: L221-L229

def choice_primitive_monte_carlo(player, try_num = 150):
    _, _, putable_position_nums = player
    position_scores = np.zeros(len(putable_position_nums))
    for _ in range(try_num):
        playouts = [playout(player, position_num) for position_num in putable_position_nums]
        position_scores += np.array([1 if is_win else 0 for is_win in playouts])
    index = np.random.choice(np.where(position_scores == position_scores.max())[0])
    choice_data = { 'position_num': putable_position_nums[index] }
    return choice_data

プレイアウトはランダム選択同士の対戦をしているだけです。

main.py: L93-L98

def playout(player, position_num):
    _, is_black, _ = player
    board = put(player, position_num)
    puts = game(choice_random, choice_random, board, False)
    is_win = is_win_game(puts, is_black)
    return is_win

これもコマンドラインで使えるようにしています。

main.py: L668-L669

    elif len(args) > 1 and args[1] == 'play-primitive-monte-carlo':
        play(choice_primitive_monte_carlo)

MCTS

MCTSは子ノード選択と展開(_selection_expansion)、評価(_evaluation)、更新(_backup)を繰り返します。そして試行回数が一番多い手を次の一手として選択(_choice_node_index)します。

main.py: L231-L312

class ChoiceMonteCarloTreeSearch:
    # *** ...
    # *** ... L232-L301 (省略) ***
    # *** ...
    def __call__(self, player, try_num = 1500, expansion_num = 5):
        _, is_black, _ = player
        nodes = self._get_initial_nodes(player)
        for _ in range(try_num):
            nodes, node, path = self._selection_expansion(nodes, expansion_num)
            is_win = self._evaluation(node, is_black)
            nodes = self._backup(nodes, path, is_win)
        index = self._choice_node_index(nodes)
        choice = nodes[index]['position_num']
        choice_data = { 'position_num': choice }
        return choice_data

子ノード選択はUCB1が最大となる子ノードをリーフノードまで辿っていきます。展開はリーフノードの訪問回数が閾値を超えた場合に行います。

main.py: L248-L279

    def _get_ucb1(self, node, total_num):
        return (node['win_num'] / node['try_num']) + math.sqrt(2 * math.log(total_num) / node['try_num'])

    def _selection_node_index(self, nodes):
        total_num = functools.reduce(lambda total_num, node: total_num + node['try_num'], nodes, 0)
        ucb1s = np.array([self._get_ucb1(node, total_num) if node['try_num'] != 0 else -1 for node in nodes])
        indexs = np.where(ucb1s == -1)[0]  # -1 is infinity
        if len(indexs) == 0:
            indexs = np.where(ucb1s == ucb1s.max())[0]
        index = np.random.choice(indexs)
        return index

    def _selection_expansion(self, nodes, expansion_num):
        game = []
        node, path = None, []
        target_nodes = nodes
        while True:
            index = self._selection_node_index(target_nodes)
            path.append(index)
            node = target_nodes[index]
            if node['child_nodes'] is None:
                if node['try_num'] >= expansion_num:
                    if is_end_game(game, node['player']):
                        break
                    # expansion
                    node['child_nodes'] = self._get_initial_nodes(node['player'])
                else:
                    break
            target_nodes = node['child_nodes']
            choice_data = { 'position_num': node['position_num'] }
            game.append((node['player'], choice_data))
        return nodes, node, path

評価はプレイアウトを行い勝敗を決定します。

main.py: L281-L285

    def _evaluation(self, node, is_black):
        is_win = playout(node['player'], node['position_num'])
        _, node_is_black, _ = node['player']
        node_is_win = (is_black == node_is_black and is_win) or (is_black != node_is_black and not is_win)
        return node_is_win

更新は子ノード選択でメモしておいた選択ノードのパスを巡ってパラメータを更新していきます。

main.py: L287-L294

    def _backup(self, nodes, path, is_win):
        target_nodes = nodes
        for index in path:
            target_nodes[index]['try_num'] += 1
            if is_win:
                target_nodes[index]['win_num'] += 1
            target_nodes = target_nodes[index]['child_nodes']
        return nodes

最後に試行回数が一番多い手を選択します。

main.py: L296-L300

    def _choice_node_index(self, nodes):
        try_nums = np.array([node['try_num'] for node in nodes])
        indexs = np.where(try_nums == try_nums.max())[0]
        index = np.random.choice(indexs)
        return index

このchoice_xxx関数もコマンドラインで使えるようにしています。

main.py: L670-L671

    elif len(args) > 1 and args[1] == 'play-mcts':
        play(ChoiceMonteCarloTreeSearch())

デュアルネットワークのモデル

AlphaZeroもどきで利用することになるモデルです。ポリシーネットワークとバリューネットワークを統合したマルチタスク学習なデュアル残差ネットワーク構造になっています。*4

main.py: L149-L189

class DualNet(chainer.Chain):
    def __init__(self):
        super(DualNet, self).__init__()
        with self.init_scope():
            self.conv0 = L.Convolution2D(4,  48, 3, pad=1)
            self.conv1 = L.Convolution2D(48, 48, 3, pad=1)
            self.conv2 = L.Convolution2D(48, 48, 3, pad=1)
            self.conv3 = L.Convolution2D(48, 48, 3, pad=1)
            self.conv4 = L.Convolution2D(48, 48, 3, pad=1)

            self.bn0 = L.BatchNormalization(48)
            self.bn1 = L.BatchNormalization(48)
            self.bn2 = L.BatchNormalization(48)
            self.bn3 = L.BatchNormalization(48)
            self.bn4 = L.BatchNormalization(48)

            self.conv_p1 = L.Convolution2D(48, 2, 1)
            self.bn_p1   = L.BatchNormalization(2)
            self.fc_p2   = L.Linear(8 * 8 * 2, 8 * 8)

            self.conv_v1 = L.Convolution2D(48, 1, 1)
            self.bn_v1   = L.BatchNormalization(1)
            self.fc_v2   = L.Linear(8 * 8, 48)
            self.fc_v3   = L.Linear(48, 1)

    def __call__(self, x):
        # tiny ResNet
        h0 = F.relu(self.bn0(self.conv0(x)))
        h1 = F.relu(self.bn1(self.conv1(h0)))
        h2 = F.relu(self.bn2(self.conv2(h1)) + h0)
        h3 = F.relu(self.bn3(self.conv3(h2)))
        h4 = F.relu(self.bn4(self.conv4(h3)) + h2)

        h_p1 = F.relu(self.bn_p1(self.conv_p1(h4)))
        policy = self.fc_p2(h_p1)

        h_v1  = F.relu(self.bn_v1(self.conv_v1(h4)))
        h_v2  = F.relu(self.fc_v2(h_v1))
        value = F.tanh(self.fc_v3(h_v2))

        return policy, value

ネットワークへの入力は下記をまとめたものにしてみました。

  • 自分の石が置いてある場所が1、置いてない場所が0
  • 相手の石が置いてある場所が1、置いてない場所が0
  • 石が置いてない場所が1、置いてある場所が0
  • 自分が石を置ける場所が1、置けない場所が0

main.py: L137-L147

def get_dualnet_input_data(player):
    board, is_black, putable_position_nums = player

    mine    = np.array([1 if (is_black and v == 1) or (not is_black and v == -1) else 0 for v in board], dtype=np.float32)
    yours   = np.array([1 if (is_black and v == -1) or (not is_black and v == 1) else 0 for v in board], dtype=np.float32)
    blank   = np.array([1 if v == 0 else 0 for v in board], dtype=np.float32)
    putable = np.array([1 if i in putable_position_nums else 0 for i in range(64)], dtype=np.float32)

    # 64 + 64 + 64 + 64
    x = np.concatenate((mine, yours, blank, putable)).reshape((1, 4, 8, 8))
    return x

SLポリシーネットワーク

SLはSupervised Learning(教師付き学習)のことです。デュアルネットワークのポリシーネットワークの結果に従い次の一手を選択します。

main.py: L314-L328

class ChoiceSupervisedLearningPolicyNetwork:
    def __init__(self, model):
        self.model = model

    def __call__(self, player):
        _, _, putable_position_nums = player

        policy, _ = self.model(get_dualnet_input_data(player))

        putable_position_probabilities = np.array([policy[0].data[num] for num in putable_position_nums])
        indexs = np.where(putable_position_probabilities == putable_position_probabilities.max())[0]
        index = np.random.choice(indexs)
        choice = putable_position_nums[index]
        choice_data = { 'position_num': choice }
        return choice_data

このchoice_xxx関数もコマンドラインで使えるようにしています。モデルファイルからロードすることも可能です。

main.py: L672-L677

    elif len(args) > 1 and args[1] == 'play-sl-policy-network-random':
        play(ChoiceSupervisedLearningPolicyNetwork(DualNet()))
    elif len(args) > 2 and args[1] == 'play-sl-policy-network':
        model = DualNet()
        model.load(args[2])
        play(ChoiceSupervisedLearningPolicyNetwork(model))

APV-MCTS(AlphaZeroもどき)

APV-MCTSはプレイアウトを廃止し代わりにデュアルネットワークを使うように改良したMCTSです。子ノード選択と展開(_selection_expansion)、更新(_backup)を繰り返します。そして試行回数が一番多い手を次の一手として選択(_choice_node_index)します。

main.py: L330-L420

class ChoiceAsynchronousPolicyAndValueMonteCarloTreeSearch:
    # *** ...
    # *** ... L331-L412(省略) ***
    # *** ...
    def __call__(self, player, try_num = 1500):
        _, nodes = self._get_initial_nodes(player)
        for _ in range(try_num):
            nodes, node, path = self._selection_expansion(nodes)
            nodes = self._backup(nodes, path, node['value'])
        index = self._choice_node_index(nodes)
        choice = nodes[index]['position_num']
        candidates = [{ 'position_num': "{}".format(node['position_num']), 'try_num': "{}".format(node['try_num']) } for node in nodes]
        choice_data = { 'position_num': choice, 'candidates': candidates }
        return choice_data

子ノード選択はデュアルネットワークが出力する予測確率を考慮した評価値が最大となる子ノードをリーフノードまで辿っていきます。展開は無条件に行います。

main.py: L362-L390

    def _get_score(self, node, total_num):
        return (node['win_num'] / (1 + node['try_num'])) + node['probability'] * (math.sqrt(total_num) / (1 + node['try_num']))

    def _selection_node_index(self, nodes):
        total_num = functools.reduce(lambda total_num, node: total_num + node['try_num'], nodes, 0)
        scores = np.array([self._get_score(node, total_num) for node in nodes])
        indexs = np.where(scores == scores.max())[0]
        index = np.random.choice(indexs)
        return index

    def _selection_expansion(self, nodes):
        game = []
        node, path = None, []
        target_nodes = nodes
        while True:
            index = self._selection_node_index(target_nodes)
            path.append(index)
            node = target_nodes[index]
            if node['child_nodes'] is None:
                # expansion
                value, child_nodes = self._get_initial_nodes(node['player'])
                node['value'] = value
                if not is_end_game(game, node['player']):
                    node['child_nodes'] = child_nodes
                break
            target_nodes = node['child_nodes']
            choice_data = { 'position_num': node['position_num'] }
            game.append((node['player'], choice_data))
        return nodes, node, path

更新は子ノード選択でメモしておいた選択ノードのパスを巡ってパラメータを更新していきます。勝率はデュアルネットワークが出力するバリューを使います。

main.py: L392-L398

    def _backup(self, nodes, path, value):
        target_nodes = nodes
        for index in path:
            target_nodes[index]['try_num'] += 1
            target_nodes[index]['win_num'] += value
            target_nodes = target_nodes[index]['child_nodes']
        return nodes

最後に試行回数が一番多い手を選択します。試行回数の確率で選択するロジックは後述のセルフプレイで使います。

main.py: L400-L411

    def _choice_node_index(self, nodes):
        try_nums = np.array([node['try_num'] for node in nodes])
        try_nums_sum = try_nums.sum()
        index = 0
        if self.is_strict_choice or try_nums_sum == 0:
            indexs = np.where(try_nums == try_nums.max())[0]
            index = np.random.choice(indexs)
        else:
            try_nums = try_nums.astype(np.float32)
            try_nums /= try_nums_sum  # to probability
            index = np.random.choice(range(len(try_nums)), p = try_nums)
        return index

このchoice_xxx関数もコマンドラインで使えるようにしています。モデルファイルからロードすることも可能です。

main.py: L678-L683

    elif len(args) > 1 and args[1] == 'play-apv-mcts-random':
        play(ChoiceAsynchronousPolicyAndValueMonteCarloTreeSearch(DualNet()))
    elif len(args) > 2 and args[1] == 'play-apv-mcts':
        model = DualNet()
        model.load(args[2])
        play(ChoiceAsynchronousPolicyAndValueMonteCarloTreeSearch(model))

APV-MCTSのセルフプレイによる強化学習

APV-MCTSのセルフプレイを行いデュアルネットワークのモデルをトレーニングしていきます。暫定最強モデル同士のセルフプレイ(_self_play)、暫定最強モデルをベースにセルフプレイ結果でトレーニングした新モデル作成(_create_new_model)、旧モデルと新モデルの対戦による暫定最強モデルの決定(_evaluation)を繰り返します。そして最後の暫定最強モデルが結果のモデルになります。

main.py: L424-L556

class DualNetTrainer:
    # *** ...
    # *** ... L425-L546(省略) ***
    # *** ...
    def __call__(self, try_num = 100):
        for i in range(try_num):
            _, _, data_filename = self._self_play(self.model, self.model, is_strict_choice = False)
            steps_list = []
            with open(data_filename, 'r') as f:
                steps_list = f.readlines()
            new_model = self._create_new_model(steps_list)
            self._evaluation(new_model)
            print("[train] epoch: {} / {}".format(i + 1, try_num))
        return self.model, self.model_filename

暫定最強モデル同士のセルフプレイは次の新モデル作成で学習に使うための多様な棋譜の生成が目的です。そのためis_strict_choiceFalseを設定しAPV-MCTSは試行回数の確率で選択するモードにします。

main.py: L461-L488

    def _self_play(self, model1, model2, try_num = 2500, is_save_data = True, is_strict_choice = True):
        player1 = {
            'is_model1': True,
            'choice': ChoiceAsynchronousPolicyAndValueMonteCarloTreeSearch(model1, is_strict_choice),
            'win_num': 0
        }
        player2 = {
            'is_model1': False,
            'choice': ChoiceAsynchronousPolicyAndValueMonteCarloTreeSearch(model2, is_strict_choice),
            'win_num': 0
        }
        date_str   = datetime.date.today().strftime("%Y%m%d")
        unique_str = str(uuid.uuid4())
        data_filename = 'data/self_playdata_{}_{}.dat'.format(date_str, unique_str)
        print("[self play] data_filename: {}".format(data_filename))
        for i in range(try_num):
            steps = game(player1['choice'], player2['choice'], is_render = False)
            if is_win_game(steps, is_black = True):
                player1['win_num'] += 1
            if is_win_game(steps, is_black = False):
                player2['win_num'] += 1
            if is_save_data:
                self._save_self_playdata(steps, data_filename)
            player1, player2 = player2, player1
            (model1_player, model2_player) = (player1, player2) if player1['is_model1'] else (player2, player1)
            print("[self play] epoch: {} / {}, model1_win_num: {}, model2_win_num: {}".format(i + 1, try_num, model1_player['win_num'], model2_player['win_num']))
        (model1_win_num, model2_win_num) = (player1['win_num'], player2['win_num']) if player1['is_model1'] else (player2['win_num'], player1['win_num'])
        return model1_win_num, model2_win_num, data_filename

新モデル作成では暫定最強モデルと同じネットワークを作成しトレーニングします。トレーニングにはセルフプレイした結果の棋譜データからランダムに取り出した学習データを使います。

main.py: L490-L540

    def _get_train_y_policy(self, candidates, temperature = 0.5):
        y_policy = np.array([0] * 64, dtype=np.float32)
        sum_try_num = np.array([int(candidate['try_num']) ** (1 / temperature) for candidate in candidates]).sum()
        for candidate in candidates:
            y_policy[int(candidate['position_num'])] = (int(candidate['try_num']) ** (1 / temperature)) / sum_try_num
        return y_policy

    def _get_train_x(self, steps, step_num):
        position_nums = [int(step['position_num']) for step in steps]
        choice1 = ChoiceReplaySteps(np.array(position_nums, dtype=np.int32)[::2])
        choice2 = ChoiceReplaySteps(np.array(position_nums, dtype=np.int32)[1::2])
        steps = game(choice1, choice2, is_render = False, limit_step_num = step_num)
        player, _ = steps[-1]
        x = get_dualnet_input_data(player)
        return x

    def _get_train_random(self, steps_list):
        steps      = json.loads(np.random.choice(steps_list))
        step_index = np.random.randint(0, len(steps) - 1)

        y_policy = self._get_train_y_policy(steps[step_index]['candidates'])
        y_value  = np.array([[int(steps[step_index]['win_score'])]], dtype=np.float32)
        x        = self._get_train_x(steps, (step_index + 1))
        return x, y_policy, y_value

    def _get_train_batch(self, steps_list, batch_size):
        batch_x, batch_y_policy, batch_y_value = [], [], []
        for _ in range(batch_size):
            x, y_policy, y_value = self._get_train_random(steps_list)
            batch_x.append(x)
            batch_y_policy.append(y_policy)
            batch_y_value.append(y_value)
        x_train        = Variable(np.array(batch_x)).reshape(-1, 4, 8, 8)
        y_train_policy = Variable(np.array(batch_y_policy)).reshape(-1, 64)
        y_train_value  = Variable(np.array(batch_y_value)).reshape(-1, 1)
        return x_train, y_train_policy, y_train_value

    def _create_new_model(self, steps_list, epoch_num = 100, batch_size = 2048):
        model = DualNet()
        model.load(self.model_filename)
        optimizer = chainer.optimizers.Adam()
        optimizer.setup(model)
        for i in range(epoch_num):
            x_train, y_train_policy, y_train_value = self._get_train_batch(steps_list, batch_size)
            y_policy, y_value = model(x_train)
            model.cleargrads()
            loss = F.mean_squared_error(y_policy, y_train_policy) + F.mean_squared_error(y_value, y_train_value)
            loss.backward()
            optimizer.update()
            print("[new nodel] epoch: {} / {}, loss: {}".format(i + 1, epoch_num, loss))
        return model

旧モデルと新モデルの対戦による暫定最強モデルの決定では400戦して新モデルが220勝以上したら暫定最強モデルを変更するようにしました。

main.py: L542-L545

    def _evaluation(self, new_model):
        _, new_model_win_num, _ = self._self_play(self.model, new_model, try_num = 400, is_save_data = False)
        if new_model_win_num >= 220:
            self._set_model(new_model)

この強化学習コマンドラインで使えるようにしています。モデルファイルからロードすることも可能です。

main.py: L688-L697

    elif len(args) > 1 and args[1] == 'create-model':
        trainer = DualNetTrainer()
        _, model_filename = trainer()
        print(model_filename)
    elif len(args) > 2 and args[1] == 'train-model':
        model = DualNet()
        model.load(args[2])
        trainer = DualNetTrainer(model)
        _, model_filename = trainer()
        print(model_filename)

参考文献

ほぼほぼ下記の書籍を参考にさせていただきました。アルファ碁ゼロアルゴリズムが網羅されていてとても詳細に記載されています。

www.shoeisha.co.jp

最強囲碁AI アルファ碁解体新書 第6章 アルファ碁ゼロの解説

www.shoeisha.co.jp

下記の書籍も実装が載っていて参考にさせていただきました。

book.mynavi.jp

下記も解説が詳細に記載されています。

blog.brainpad.co.jp

今回、あまり参照できてないですが論文も。*5

Mastering the Game of Go with Deep Neural Networks and Tree Search

Mastering the Game of Go without Human Knowledge

最近出た論文の差分も気になります。

A general reinforcement learning algorithm that masters chess, shogi, and Go through self-play

tadaoyamaoka.hatenablog.com

まとめ

今回はオセロのプレイヤーのアルゴリズムをランダムから原始モンテカルロMCTS、AlphaZeroもどきへと進化させながら実装してみました。

ただ、残念なことに時間とマシンパワーが足りずちゃんと動かすところまではできませんでした。引き続きGPU対応やAWS/GCPで試すなどして実際にちゃんと動かすところまでも挑戦してみたいです。つまりクリスマスのオセロ相手を得ることはできませんでした。

*1:AlphaGoをリスペクト、インスパイヤしてheta-reversiにしました。hetaは古代ギリシア文字のヘータです。下手リバーシ。すっかり親父ギャグな年頃です。凍える

*2:残念ながらマシンパワーが足りず手元の環境で実行が終わったところを見た事はありませんが動くはずです...

*3:理論的な部分は参考文献に譲ります

*4:残差ブロックが2層しかなかったりとだいぶ軽量ですが

*5:一次情報に当たらないのは悪い癖ですね。。色々誤解して実装してしまってる箇所も多いと思います