카테고리 없음

MNIST Lab - 기본 MLP에서 CNN+BatchNorm 모델 구현하기

cedis 2026. 5. 30. 22:51

MNIST Lab CNN 실험

기본 MNIST Lab 구현 위에 CNN+BatchNorm 모델을 직접 추가한 기록이다.  어떤 파일을 만들고 어떤 계층을 구현해야 하는지 코드 기준으로 정리한다.

핵심부터 말하면

이 글은 기본 MLP 구현에서 출발해 CNN+BatchNorm 모델을 새로 추가하는 실험 기록이다. 목표는 784개 픽셀을 한 줄로만 보던 모델에서 벗어나, MNIST 이미지를 이미지 모양 그대로 처리하는 모델을 직접 구성하는 것이다.

이 글의 성공 기준

독자가 기본 MNIST Lab 코드가 있는 상태에서 새 CNN 계층 파일, CNN 모델 파일, Momentum, CNN 학습 루프를 추가해 같은 방향의 실험을 실행할 수 있어야 한다.

1. 왜 MLP에서 CNN으로 바꾸는가

구분 기본 MLP CNN 도전
입력 해석 784개 픽셀을 한 줄 벡터로 본다. 1x28x28 이미지의 위치와 주변 관계를 유지한다.
강점 구현이 단순하고 빠르게 기준점을 만든다. 작은 획이 어느 위치에 있는지 더 자연스럽게 본다.
한계 픽셀의 이웃 관계를 모델 구조가 직접 쓰지 못한다. 구현해야 할 계층과 backward가 늘어난다.
1
이미지 형태 유지

MNIST를 (N, 784)가 아니라 (N, 1, 28, 28)로 바꾼다.

2
Conv

5x5 필터 30개가 이미지를 훑으며 획의 지역 패턴을 찾는다.

3
Pooling

근처 특징을 묶어 크기를 줄이고 위치 변화에 조금 둔감하게 만든다.

4
Flatten

CNN 출력을 기존 Affine 계층이 받을 수 있게 2D 벡터로 펼친다.

5
BatchNorm + ReLU

은닉층 값을 안정화하고 비선형성을 준다.

2. 새로 추가할 파일 구조

파일 추가하는 것 왜 필요한가
cnn_layers.py im2col, col2im, Convolution, Pooling, Flatten 이미지 전용 계층을 직접 구현한다.
cnn_network.py SimpleConvNetBN 기존 ReLU, Affine, BatchNorm과 새 CNN 계층을 조립한다.
momentum.py Momentum optimizer 이번 CNN 실험에서 사용할 갱신 규칙이다.
training_cnn.py CNN용 train/evaluate 루프 4D 이미지 입력을 모델에 넣고 결과를 검증한다.
run_cnn_bn_experiment.py 실험 실행 스크립트 데이터 reshape, 모델 생성, 학습, 결과 저장을 담당한다.
중요한 분리

기존 기본 MLP 코드는 지우지 않는다. CNN 도전은 새 파일을 추가해 별도 모델로 실험하는 방식이다.

3. cnn_layers.py 전체 구현

CNN 구현에서 가장 큰 장벽은 합성곱을 빠르게 계산하는 방법이다. 여기서는 이미지 조각을 행렬로 펼치는 im2col 방식을 사용한다. forward에서는 im2col로 펼쳐 행렬 곱을 하고, backward에서는 col2im으로 gradient를 다시 이미지 모양으로 되돌린다.

# -*- coding: utf-8 -*-
"""NumPy CNN 계층 모음.

논문 실험 복사용으로 Convolution, Pooling, Flatten 계층을 추가한다.
기존 과제의 Affine, BatchNorm, ReLU 계층과 연결해서 사용한다.
"""

import numpy as np


def im2col(input_data, filter_h, filter_w, stride=1, pad=0):
    """4D 이미지 묶음을 2D 행렬로 펼친다."""
    N, C, H, W = input_data.shape
    out_h = (H + 2 * pad - filter_h) // stride + 1
    out_w = (W + 2 * pad - filter_w) // stride + 1

    img = np.pad(
        input_data,
        [(0, 0), (0, 0), (pad, pad), (pad, pad)],
        "constant",
    )
    col = np.zeros((N, C, filter_h, filter_w, out_h, out_w))

    for y in range(filter_h):
        y_max = y + stride * out_h
        for x in range(filter_w):
            x_max = x + stride * out_w
            col[:, :, y, x, :, :] = img[:, :, y:y_max:stride, x:x_max:stride]

    col = col.transpose(0, 4, 5, 1, 2, 3).reshape(N * out_h * out_w, -1)
    return col


def col2im(col, input_shape, filter_h, filter_w, stride=1, pad=0):
    """im2col로 펼친 gradient를 원래 4D 이미지 형태로 되돌린다."""
    N, C, H, W = input_shape
    out_h = (H + 2 * pad - filter_h) // stride + 1
    out_w = (W + 2 * pad - filter_w) // stride + 1
    col = col.reshape(N, out_h, out_w, C, filter_h, filter_w).transpose(
        0, 3, 4, 5, 1, 2
    )

    img = np.zeros((N, C, H + 2 * pad + stride - 1, W + 2 * pad + stride - 1))
    for y in range(filter_h):
        y_max = y + stride * out_h
        for x in range(filter_w):
            x_max = x + stride * out_w
            img[:, :, y:y_max:stride, x:x_max:stride] += col[:, :, y, x, :, :]

    return img[:, :, pad:H + pad, pad:W + pad]


class Convolution:
    """합성곱 계층."""

    def __init__(self, W, b, stride=1, pad=0):
        self.W = W
        self.b = b
        self.stride = stride
        self.pad = pad
        self.x = None
        self.col = None
        self.col_W = None
        self.dW = None
        self.db = None

    def forward(self, x):
        FN, C, FH, FW = self.W.shape
        N, _, H, W = x.shape
        out_h = (H + 2 * self.pad - FH) // self.stride + 1
        out_w = (W + 2 * self.pad - FW) // self.stride + 1

        col = im2col(x, FH, FW, self.stride, self.pad)
        col_W = self.W.reshape(FN, -1).T
        out = np.dot(col, col_W) + self.b
        out = out.reshape(N, out_h, out_w, -1).transpose(0, 3, 1, 2)

        self.x = x
        self.col = col
        self.col_W = col_W
        return out

    def backward(self, dout):
        FN, C, FH, FW = self.W.shape
        dout = dout.transpose(0, 2, 3, 1).reshape(-1, FN)

        self.db = np.sum(dout, axis=0)
        self.dW = np.dot(self.col.T, dout)
        self.dW = self.dW.transpose(1, 0).reshape(FN, C, FH, FW)

        dcol = np.dot(dout, self.col_W.T)
        dx = col2im(dcol, self.x.shape, FH, FW, self.stride, self.pad)
        return dx


class Pooling:
    """Max Pooling 계층."""

    def __init__(self, pool_h, pool_w, stride=1, pad=0):
        self.pool_h = pool_h
        self.pool_w = pool_w
        self.stride = stride
        self.pad = pad
        self.x = None
        self.arg_max = None

    def forward(self, x):
        N, C, H, W = x.shape
        out_h = (H - self.pool_h) // self.stride + 1
        out_w = (W - self.pool_w) // self.stride + 1

        col = im2col(x, self.pool_h, self.pool_w, self.stride, self.pad)
        col = col.reshape(-1, self.pool_h * self.pool_w)

        self.arg_max = np.argmax(col, axis=1)
        out = np.max(col, axis=1)
        out = out.reshape(N, out_h, out_w, C).transpose(0, 3, 1, 2)

        self.x = x
        return out

    def backward(self, dout):
        dout = dout.transpose(0, 2, 3, 1)

        pool_size = self.pool_h * self.pool_w
        dmax = np.zeros((dout.size, pool_size))
        dmax[np.arange(self.arg_max.size), self.arg_max.flatten()] = dout.flatten()
        dmax = dmax.reshape(dout.shape + (pool_size,))

        dcol = dmax.reshape(dmax.shape[0] * dmax.shape[1] * dmax.shape[2], -1)
        dx = col2im(dcol, self.x.shape, self.pool_h, self.pool_w, self.stride, self.pad)
        return dx


class Flatten:
    """4D CNN 출력을 Affine 계층이 받을 2D 행렬로 펼친다."""

    def __init__(self):
        self.original_shape = None

    def forward(self, x):
        self.original_shape = x.shape
        return x.reshape(x.shape[0], -1)

    def backward(self, dout):
        return dout.reshape(self.original_shape)

4. Convolution backward를 따로 읽어보기

1
dout 모양 변경

출력 gradient를 행렬 곱에 맞는 2D 형태로 바꾼다.

2
db

출력 채널 방향으로 gradient를 모두 더해 bias gradient를 만든다.

3
dW

입력 조각 col과 dout을 곱해 필터 gradient를 만든 뒤 원래 필터 모양으로 되돌린다.

4
dx

dout을 필터 행렬과 곱한 뒤 col2im으로 원래 입력 이미지 gradient로 되돌린다.

처음 헷갈리는 지점

Convolution backward는 새 수식을 외우는 문제가 아니다. forward에서 행렬 곱으로 바꿔 계산했으니, backward도 그 행렬 곱의 역방향 gradient를 원래 이미지 모양으로 되돌리는 문제다.

5. cnn_network.py 전체 구현

새 계층을 만들었으면 모델로 조립해야 한다. 이 모델은 Conv -> ReLU -> Pool -> Flatten -> Affine -> BatchNorm -> ReLU -> Affine -> Softmax 순서로 흐른다.

# -*- coding: utf-8 -*-
"""논문 구조를 참고한 SimpleConvNet + BatchNorm 모델."""

from collections import OrderedDict

import numpy as np

from activations import ReLU, Softmax
from cnn_layers import Convolution, Flatten, Pooling
from layers import Affine, BatchNorm
from losses import cross_entropy_loss


class SimpleConvNetBN:
    """Conv -> ReLU -> Pool -> Affine -> BN -> ReLU -> Affine -> Softmax."""

    def __init__(
        self,
        input_dim=(1, 28, 28),
        conv_param=None,
        hidden_size=100,
        output_size=10,
        weight_init_std=0.01,
    ):
        if conv_param is None:
            conv_param = {"filter_num": 30, "filter_size": 5, "pad": 0, "stride": 1}

        filter_num = conv_param["filter_num"]
        filter_size = conv_param["filter_size"]
        filter_pad = conv_param["pad"]
        filter_stride = conv_param["stride"]
        input_size = input_dim[1]
        conv_output_size = (
            input_size - filter_size + 2 * filter_pad
        ) // filter_stride + 1
        pool_output_size = int(filter_num * (conv_output_size / 2) * (conv_output_size / 2))

        self.params = {}
        self.params["W1"] = weight_init_std * np.random.randn(
            filter_num, input_dim[0], filter_size, filter_size
        )
        self.params["b1"] = np.zeros(filter_num)
        self.params["W2"] = weight_init_std * np.random.randn(pool_output_size, hidden_size)
        self.params["b2"] = np.zeros(hidden_size)
        self.params["gamma2"] = np.ones(hidden_size)
        self.params["beta2"] = np.zeros(hidden_size)
        self.params["W3"] = weight_init_std * np.random.randn(hidden_size, output_size)
        self.params["b3"] = np.zeros(output_size)

        self.layers = OrderedDict()
        self.layers["Conv1"] = Convolution(
            self.params["W1"],
            self.params["b1"],
            conv_param["stride"],
            conv_param["pad"],
        )
        self.layers["ReLU1"] = ReLU()
        self.layers["Pool1"] = Pooling(pool_h=2, pool_w=2, stride=2)
        self.layers["Flatten"] = Flatten()
        self.layers["Affine2"] = Affine(self.params["W2"], self.params["b2"])
        self.layers["BatchNorm2"] = BatchNorm(self.params["gamma2"], self.params["beta2"])
        self.layers["ReLU2"] = ReLU()
        self.layers["Affine3"] = Affine(self.params["W3"], self.params["b3"])
        self.softmax = Softmax()
        self.grads = {}

    def forward(self, x, train=True):
        out = x
        for layer in self.layers.values():
            if isinstance(layer, BatchNorm):
                out = layer.forward(out, train=train)
            else:
                out = layer.forward(out)
        return self.softmax.forward(out)

    def backward(self, dout):
        self.grads = {}
        dout = self.softmax.backward(dout)

        for name, layer in reversed(list(self.layers.items())):
            dout = layer.backward(dout)

            if isinstance(layer, Convolution):
                idx = name.replace("Conv", "")
                self.grads[f"W{idx}"] = layer.dW
                self.grads[f"b{idx}"] = layer.db

            if isinstance(layer, Affine):
                idx = name.replace("Affine", "")
                self.grads[f"W{idx}"] = layer.dW
                self.grads[f"b{idx}"] = layer.db

            if isinstance(layer, BatchNorm):
                idx = name.replace("BatchNorm", "")
                self.grads[f"gamma{idx}"] = layer.dgamma
                self.grads[f"beta{idx}"] = layer.dbeta

        return dout

    def loss(self, x, y):
        return cross_entropy_loss(self.forward(x, train=True), y)

    def predict(self, x, batch_size=100):
        preds = []
        for start in range(0, x.shape[0], batch_size):
            preds.append(self.forward(x[start:start + batch_size], train=False))
        return np.vstack(preds)

6. Momentum과 CNN 학습 루프

기본 구현에서는 Adam을 썼지만, 이번 CNN 실험은 Momentum으로 진행했다. Momentum은 이전 이동 방향을 기억해 SGD보다 덜 흔들리게 이동하는 optimizer다.

# -*- coding: utf-8 -*-
"""Momentum optimizer."""

import numpy as np


class Momentum:
    """v = momentum * v - lr * grad, param += v."""

    def __init__(self, lr=0.1, momentum=0.9):
        self.lr = lr
        self.momentum = momentum
        self.v = {}

    def update(self, params, grads):
        for key in params.keys():
            if key not in self.v:
                self.v[key] = np.zeros_like(params[key])
            self.v[key] = self.momentum * self.v[key] - self.lr * grads[key]
            params[key] += self.v[key]
# -*- coding: utf-8 -*-
"""CNN 학습/평가 루프."""

import time

import numpy as np

from losses import cross_entropy_loss


def _softmax_cross_entropy_grad(y_pred, y_true):
    batch_size = y_pred.shape[0]
    dout = y_pred.copy()
    dout[np.arange(batch_size), y_true] -= 1
    dout /= batch_size
    return dout


def train_cnn(
    model,
    optimizer,
    x_train,
    y_train,
    x_test,
    y_test,
    epochs=20,
    batch_size=100,
    eval_sample_num=1000,
    seed=42,
):
    rng = np.random.default_rng(seed)
    num_train = x_train.shape[0]
    loss_history = []
    train_acc_history = []
    test_acc_history = []
    started = time.time()

    for epoch in range(1, epochs + 1):
        indices = rng.permutation(num_train)
        epoch_loss_sum = 0.0
        epoch_seen = 0

        for start in range(0, num_train, batch_size):
            batch_indices = indices[start:start + batch_size]
            x_batch = x_train[batch_indices]
            y_batch = y_train[batch_indices]

            y_pred = model.forward(x_batch, train=True)
            loss = cross_entropy_loss(y_pred, y_batch)
            dout = _softmax_cross_entropy_grad(y_pred, y_batch)
            model.backward(dout)
            optimizer.update(model.params, model.grads)

            epoch_loss_sum += float(loss) * x_batch.shape[0]
            epoch_seen += x_batch.shape[0]

        epoch_loss = epoch_loss_sum / epoch_seen
        loss_history.append(epoch_loss)

        if eval_sample_num:
            train_idx = rng.choice(num_train, size=eval_sample_num, replace=False)
            test_idx = rng.choice(x_test.shape[0], size=eval_sample_num, replace=False)
            train_acc = accuracy(model, x_train[train_idx], y_train[train_idx])
            test_acc = accuracy(model, x_test[test_idx], y_test[test_idx])
        else:
            train_acc = accuracy(model, x_train, y_train)
            test_acc = accuracy(model, x_test, y_test)

        train_acc_history.append(train_acc)
        test_acc_history.append(test_acc)
        print(
            f"epoch={epoch:02d} loss={epoch_loss:.4f} "
            f"train_acc={train_acc:.2f} test_acc={test_acc:.2f} "
            f"elapsed={time.time() - started:.1f}s",
            flush=True,
        )

    final_test_acc = accuracy(model, x_test, y_test)
    return {
        "loss_history": loss_history,
        "train_acc_history": train_acc_history,
        "test_acc_history": test_acc_history,
        "final_test_accuracy": final_test_acc,
        "elapsed_sec": time.time() - started,
        "params": sum(p.size for p in model.params.values()),
    }


def accuracy(model, x, y, batch_size=100):
    y_pred = model.predict(x, batch_size=batch_size)
    return float(np.mean(np.argmax(y_pred, axis=1) == y) * 100)

7. 실행 스크립트

실행 스크립트에서는 MNIST 입력을 (N, 1, 28, 28)로 reshape한다. 이 줄이 빠지면 CNN 계층은 입력을 이미지로 볼 수 없다.

# -*- coding: utf-8 -*-
"""논문 구조 기반 CNN+BN MNIST 실험 실행 스크립트."""

import argparse
import json
import sys
from pathlib import Path

import numpy as np

sys.path.insert(0, str(Path(__file__).resolve().parent / "src"))

from data import load_mnist
from cnn_network import SimpleConvNetBN
from momentum import Momentum
from training_cnn import train_cnn


def main():
    parser = argparse.ArgumentParser()
    parser.add_argument("--epochs", type=int, default=20)
    parser.add_argument("--batch-size", type=int, default=100)
    parser.add_argument("--lr", type=float, default=0.1)
    parser.add_argument("--momentum", type=float, default=0.9)
    parser.add_argument("--seed", type=int, default=42)
    parser.add_argument("--eval-sample-num", type=int, default=1000)
    parser.add_argument("--train-limit", type=int, default=0)
    parser.add_argument("--test-limit", type=int, default=0)
    args = parser.parse_args()

    np.random.seed(args.seed)
    (x_train, y_train), (x_test, y_test) = load_mnist()
    x_train = x_train.reshape(-1, 1, 28, 28)
    x_test = x_test.reshape(-1, 1, 28, 28)

    if args.train_limit:
        x_train = x_train[:args.train_limit]
        y_train = y_train[:args.train_limit]
    if args.test_limit:
        x_test = x_test[:args.test_limit]
        y_test = y_test[:args.test_limit]

    model = SimpleConvNetBN(
        input_dim=(1, 28, 28),
        conv_param={"filter_num": 30, "filter_size": 5, "pad": 0, "stride": 1},
        hidden_size=100,
        output_size=10,
        weight_init_std=0.01,
    )
    optimizer = Momentum(lr=args.lr, momentum=args.momentum)

    result = train_cnn(
        model,
        optimizer,
        x_train,
        y_train,
        x_test,
        y_test,
        epochs=args.epochs,
        batch_size=args.batch_size,
        eval_sample_num=args.eval_sample_num,
        seed=args.seed,
    )

    result.update(
        {
            "model": "Conv(30,5x5)-ReLU-MaxPool-Affine(100)-BN-ReLU-Affine(10)-Softmax",
            "optimizer": "Momentum",
            "lr": args.lr,
            "momentum": args.momentum,
            "epochs": args.epochs,
            "batch_size": args.batch_size,
            "seed": args.seed,
            "train_size": int(x_train.shape[0]),
            "test_size": int(x_test.shape[0]),
        }
    )

    out_dir = Path("results")
    out_dir.mkdir(exist_ok=True)
    out_file = out_dir / f"cnn_bn_result_e{args.epochs}_seed{args.seed}.json"
    out_file.write_text(json.dumps(result, ensure_ascii=False, indent=2), encoding="utf-8")
    print(json.dumps(result, ensure_ascii=False, indent=2))
    print(f"saved={out_file}")


if __name__ == "__main__":
    main()

8. 검증한 결과

{
  "loss_history": [
    0.12507245302729014,
    0.03882406847697987,
    0.025119677322721735,
    0.017189651093665284,
    0.011629340683137241,
    0.00833651693714278,
    0.006507629267105368,
    0.005364056677391568,
    0.005077262067019434,
    0.002892537238260184
  ],
  "train_acc_history": [
    99.1,
    98.9,
    99.6,
    99.8,
    100.0,
    99.9,
    99.8,
    100.0,
    99.8,
    100.0
  ],
  "test_acc_history": [
    98.8,
    98.4,
    98.9,
    98.8,
    99.3,
    99.4,
    98.8,
    99.4,
    98.6,
    99.1
  ],
  "final_test_accuracy": 99.09,
  "elapsed_sec": 1425.5651092529297,
  "params": 434090,
  "model": "Conv(30,5x5)-ReLU-MaxPool-Affine(100)-BN-ReLU-Affine(10)-Softmax",
  "optimizer": "Momentum",
  "lr": 0.1,
  "momentum": 0.9,
  "epochs": 10,
  "batch_size": 100,
  "seed": 42,
  "train_size": 60000,
  "test_size": 10000
}
항목
기본 MLP 기준 98.41%
CNN+BN 도전 99.09%
파라미터 수 434,090
학습 시간 약 1425초

9. 이 글에서 기억할 것

한 문장 정리

CNN+BN 도전은 기본 MLP를 고치는 작업이 아니라, MNIST 이미지를 이미지답게 보기 위해 Convolution, Pooling, Flatten, CNN용 학습 루프를 새로 추가한 독립 실험이다.

스스로 점검
  1. 왜 MNIST 입력을 (N, 1, 28, 28)로 바꾸는가?
  2. im2col을 쓰면 합성곱 계산이 어떤 행렬 곱으로 바뀌는가?
  3. Pooling backward에서 왜 arg_max가 필요한가?
  4. BatchNorm은 CNN의 어느 지점에 들어갔는가?