MNIST Lab CNN 실험
기본 MNIST Lab 구현 위에 CNN+BatchNorm 모델을 직접 추가한 기록이다. 어떤 파일을 만들고 어떤 계층을 구현해야 하는지 코드 기준으로 정리한다.
핵심부터 말하면
이 글은 기본 MLP 구현에서 출발해 CNN+BatchNorm 모델을 새로 추가하는 실험 기록이다. 목표는 784개 픽셀을 한 줄로만 보던 모델에서 벗어나, MNIST 이미지를 이미지 모양 그대로 처리하는 모델을 직접 구성하는 것이다.
독자가 기본 MNIST Lab 코드가 있는 상태에서 새 CNN 계층 파일, CNN 모델 파일, Momentum, CNN 학습 루프를 추가해 같은 방향의 실험을 실행할 수 있어야 한다.
1. 왜 MLP에서 CNN으로 바꾸는가
| 구분 | 기본 MLP | CNN 도전 |
|---|---|---|
| 입력 해석 | 784개 픽셀을 한 줄 벡터로 본다. | 1x28x28 이미지의 위치와 주변 관계를 유지한다. |
| 강점 | 구현이 단순하고 빠르게 기준점을 만든다. | 작은 획이 어느 위치에 있는지 더 자연스럽게 본다. |
| 한계 | 픽셀의 이웃 관계를 모델 구조가 직접 쓰지 못한다. | 구현해야 할 계층과 backward가 늘어난다. |
MNIST를 (N, 784)가 아니라 (N, 1, 28, 28)로 바꾼다.
5x5 필터 30개가 이미지를 훑으며 획의 지역 패턴을 찾는다.
근처 특징을 묶어 크기를 줄이고 위치 변화에 조금 둔감하게 만든다.
CNN 출력을 기존 Affine 계층이 받을 수 있게 2D 벡터로 펼친다.
은닉층 값을 안정화하고 비선형성을 준다.
2. 새로 추가할 파일 구조
| 파일 | 추가하는 것 | 왜 필요한가 |
|---|---|---|
| cnn_layers.py | im2col, col2im, Convolution, Pooling, Flatten | 이미지 전용 계층을 직접 구현한다. |
| cnn_network.py | SimpleConvNetBN | 기존 ReLU, Affine, BatchNorm과 새 CNN 계층을 조립한다. |
| momentum.py | Momentum optimizer | 이번 CNN 실험에서 사용할 갱신 규칙이다. |
| training_cnn.py | CNN용 train/evaluate 루프 | 4D 이미지 입력을 모델에 넣고 결과를 검증한다. |
| run_cnn_bn_experiment.py | 실험 실행 스크립트 | 데이터 reshape, 모델 생성, 학습, 결과 저장을 담당한다. |
기존 기본 MLP 코드는 지우지 않는다. CNN 도전은 새 파일을 추가해 별도 모델로 실험하는 방식이다.
3. cnn_layers.py 전체 구현
CNN 구현에서 가장 큰 장벽은 합성곱을 빠르게 계산하는 방법이다. 여기서는 이미지 조각을 행렬로 펼치는 im2col 방식을 사용한다. forward에서는 im2col로 펼쳐 행렬 곱을 하고, backward에서는 col2im으로 gradient를 다시 이미지 모양으로 되돌린다.
# -*- coding: utf-8 -*-
"""NumPy CNN 계층 모음.
논문 실험 복사용으로 Convolution, Pooling, Flatten 계층을 추가한다.
기존 과제의 Affine, BatchNorm, ReLU 계층과 연결해서 사용한다.
"""
import numpy as np
def im2col(input_data, filter_h, filter_w, stride=1, pad=0):
"""4D 이미지 묶음을 2D 행렬로 펼친다."""
N, C, H, W = input_data.shape
out_h = (H + 2 * pad - filter_h) // stride + 1
out_w = (W + 2 * pad - filter_w) // stride + 1
img = np.pad(
input_data,
[(0, 0), (0, 0), (pad, pad), (pad, pad)],
"constant",
)
col = np.zeros((N, C, filter_h, filter_w, out_h, out_w))
for y in range(filter_h):
y_max = y + stride * out_h
for x in range(filter_w):
x_max = x + stride * out_w
col[:, :, y, x, :, :] = img[:, :, y:y_max:stride, x:x_max:stride]
col = col.transpose(0, 4, 5, 1, 2, 3).reshape(N * out_h * out_w, -1)
return col
def col2im(col, input_shape, filter_h, filter_w, stride=1, pad=0):
"""im2col로 펼친 gradient를 원래 4D 이미지 형태로 되돌린다."""
N, C, H, W = input_shape
out_h = (H + 2 * pad - filter_h) // stride + 1
out_w = (W + 2 * pad - filter_w) // stride + 1
col = col.reshape(N, out_h, out_w, C, filter_h, filter_w).transpose(
0, 3, 4, 5, 1, 2
)
img = np.zeros((N, C, H + 2 * pad + stride - 1, W + 2 * pad + stride - 1))
for y in range(filter_h):
y_max = y + stride * out_h
for x in range(filter_w):
x_max = x + stride * out_w
img[:, :, y:y_max:stride, x:x_max:stride] += col[:, :, y, x, :, :]
return img[:, :, pad:H + pad, pad:W + pad]
class Convolution:
"""합성곱 계층."""
def __init__(self, W, b, stride=1, pad=0):
self.W = W
self.b = b
self.stride = stride
self.pad = pad
self.x = None
self.col = None
self.col_W = None
self.dW = None
self.db = None
def forward(self, x):
FN, C, FH, FW = self.W.shape
N, _, H, W = x.shape
out_h = (H + 2 * self.pad - FH) // self.stride + 1
out_w = (W + 2 * self.pad - FW) // self.stride + 1
col = im2col(x, FH, FW, self.stride, self.pad)
col_W = self.W.reshape(FN, -1).T
out = np.dot(col, col_W) + self.b
out = out.reshape(N, out_h, out_w, -1).transpose(0, 3, 1, 2)
self.x = x
self.col = col
self.col_W = col_W
return out
def backward(self, dout):
FN, C, FH, FW = self.W.shape
dout = dout.transpose(0, 2, 3, 1).reshape(-1, FN)
self.db = np.sum(dout, axis=0)
self.dW = np.dot(self.col.T, dout)
self.dW = self.dW.transpose(1, 0).reshape(FN, C, FH, FW)
dcol = np.dot(dout, self.col_W.T)
dx = col2im(dcol, self.x.shape, FH, FW, self.stride, self.pad)
return dx
class Pooling:
"""Max Pooling 계층."""
def __init__(self, pool_h, pool_w, stride=1, pad=0):
self.pool_h = pool_h
self.pool_w = pool_w
self.stride = stride
self.pad = pad
self.x = None
self.arg_max = None
def forward(self, x):
N, C, H, W = x.shape
out_h = (H - self.pool_h) // self.stride + 1
out_w = (W - self.pool_w) // self.stride + 1
col = im2col(x, self.pool_h, self.pool_w, self.stride, self.pad)
col = col.reshape(-1, self.pool_h * self.pool_w)
self.arg_max = np.argmax(col, axis=1)
out = np.max(col, axis=1)
out = out.reshape(N, out_h, out_w, C).transpose(0, 3, 1, 2)
self.x = x
return out
def backward(self, dout):
dout = dout.transpose(0, 2, 3, 1)
pool_size = self.pool_h * self.pool_w
dmax = np.zeros((dout.size, pool_size))
dmax[np.arange(self.arg_max.size), self.arg_max.flatten()] = dout.flatten()
dmax = dmax.reshape(dout.shape + (pool_size,))
dcol = dmax.reshape(dmax.shape[0] * dmax.shape[1] * dmax.shape[2], -1)
dx = col2im(dcol, self.x.shape, self.pool_h, self.pool_w, self.stride, self.pad)
return dx
class Flatten:
"""4D CNN 출력을 Affine 계층이 받을 2D 행렬로 펼친다."""
def __init__(self):
self.original_shape = None
def forward(self, x):
self.original_shape = x.shape
return x.reshape(x.shape[0], -1)
def backward(self, dout):
return dout.reshape(self.original_shape)
4. Convolution backward를 따로 읽어보기
출력 gradient를 행렬 곱에 맞는 2D 형태로 바꾼다.
출력 채널 방향으로 gradient를 모두 더해 bias gradient를 만든다.
입력 조각 col과 dout을 곱해 필터 gradient를 만든 뒤 원래 필터 모양으로 되돌린다.
dout을 필터 행렬과 곱한 뒤 col2im으로 원래 입력 이미지 gradient로 되돌린다.
Convolution backward는 새 수식을 외우는 문제가 아니다. forward에서 행렬 곱으로 바꿔 계산했으니, backward도 그 행렬 곱의 역방향 gradient를 원래 이미지 모양으로 되돌리는 문제다.
5. cnn_network.py 전체 구현
새 계층을 만들었으면 모델로 조립해야 한다. 이 모델은 Conv -> ReLU -> Pool -> Flatten -> Affine -> BatchNorm -> ReLU -> Affine -> Softmax 순서로 흐른다.
# -*- coding: utf-8 -*-
"""논문 구조를 참고한 SimpleConvNet + BatchNorm 모델."""
from collections import OrderedDict
import numpy as np
from activations import ReLU, Softmax
from cnn_layers import Convolution, Flatten, Pooling
from layers import Affine, BatchNorm
from losses import cross_entropy_loss
class SimpleConvNetBN:
"""Conv -> ReLU -> Pool -> Affine -> BN -> ReLU -> Affine -> Softmax."""
def __init__(
self,
input_dim=(1, 28, 28),
conv_param=None,
hidden_size=100,
output_size=10,
weight_init_std=0.01,
):
if conv_param is None:
conv_param = {"filter_num": 30, "filter_size": 5, "pad": 0, "stride": 1}
filter_num = conv_param["filter_num"]
filter_size = conv_param["filter_size"]
filter_pad = conv_param["pad"]
filter_stride = conv_param["stride"]
input_size = input_dim[1]
conv_output_size = (
input_size - filter_size + 2 * filter_pad
) // filter_stride + 1
pool_output_size = int(filter_num * (conv_output_size / 2) * (conv_output_size / 2))
self.params = {}
self.params["W1"] = weight_init_std * np.random.randn(
filter_num, input_dim[0], filter_size, filter_size
)
self.params["b1"] = np.zeros(filter_num)
self.params["W2"] = weight_init_std * np.random.randn(pool_output_size, hidden_size)
self.params["b2"] = np.zeros(hidden_size)
self.params["gamma2"] = np.ones(hidden_size)
self.params["beta2"] = np.zeros(hidden_size)
self.params["W3"] = weight_init_std * np.random.randn(hidden_size, output_size)
self.params["b3"] = np.zeros(output_size)
self.layers = OrderedDict()
self.layers["Conv1"] = Convolution(
self.params["W1"],
self.params["b1"],
conv_param["stride"],
conv_param["pad"],
)
self.layers["ReLU1"] = ReLU()
self.layers["Pool1"] = Pooling(pool_h=2, pool_w=2, stride=2)
self.layers["Flatten"] = Flatten()
self.layers["Affine2"] = Affine(self.params["W2"], self.params["b2"])
self.layers["BatchNorm2"] = BatchNorm(self.params["gamma2"], self.params["beta2"])
self.layers["ReLU2"] = ReLU()
self.layers["Affine3"] = Affine(self.params["W3"], self.params["b3"])
self.softmax = Softmax()
self.grads = {}
def forward(self, x, train=True):
out = x
for layer in self.layers.values():
if isinstance(layer, BatchNorm):
out = layer.forward(out, train=train)
else:
out = layer.forward(out)
return self.softmax.forward(out)
def backward(self, dout):
self.grads = {}
dout = self.softmax.backward(dout)
for name, layer in reversed(list(self.layers.items())):
dout = layer.backward(dout)
if isinstance(layer, Convolution):
idx = name.replace("Conv", "")
self.grads[f"W{idx}"] = layer.dW
self.grads[f"b{idx}"] = layer.db
if isinstance(layer, Affine):
idx = name.replace("Affine", "")
self.grads[f"W{idx}"] = layer.dW
self.grads[f"b{idx}"] = layer.db
if isinstance(layer, BatchNorm):
idx = name.replace("BatchNorm", "")
self.grads[f"gamma{idx}"] = layer.dgamma
self.grads[f"beta{idx}"] = layer.dbeta
return dout
def loss(self, x, y):
return cross_entropy_loss(self.forward(x, train=True), y)
def predict(self, x, batch_size=100):
preds = []
for start in range(0, x.shape[0], batch_size):
preds.append(self.forward(x[start:start + batch_size], train=False))
return np.vstack(preds)
6. Momentum과 CNN 학습 루프
기본 구현에서는 Adam을 썼지만, 이번 CNN 실험은 Momentum으로 진행했다. Momentum은 이전 이동 방향을 기억해 SGD보다 덜 흔들리게 이동하는 optimizer다.
# -*- coding: utf-8 -*-
"""Momentum optimizer."""
import numpy as np
class Momentum:
"""v = momentum * v - lr * grad, param += v."""
def __init__(self, lr=0.1, momentum=0.9):
self.lr = lr
self.momentum = momentum
self.v = {}
def update(self, params, grads):
for key in params.keys():
if key not in self.v:
self.v[key] = np.zeros_like(params[key])
self.v[key] = self.momentum * self.v[key] - self.lr * grads[key]
params[key] += self.v[key]
# -*- coding: utf-8 -*-
"""CNN 학습/평가 루프."""
import time
import numpy as np
from losses import cross_entropy_loss
def _softmax_cross_entropy_grad(y_pred, y_true):
batch_size = y_pred.shape[0]
dout = y_pred.copy()
dout[np.arange(batch_size), y_true] -= 1
dout /= batch_size
return dout
def train_cnn(
model,
optimizer,
x_train,
y_train,
x_test,
y_test,
epochs=20,
batch_size=100,
eval_sample_num=1000,
seed=42,
):
rng = np.random.default_rng(seed)
num_train = x_train.shape[0]
loss_history = []
train_acc_history = []
test_acc_history = []
started = time.time()
for epoch in range(1, epochs + 1):
indices = rng.permutation(num_train)
epoch_loss_sum = 0.0
epoch_seen = 0
for start in range(0, num_train, batch_size):
batch_indices = indices[start:start + batch_size]
x_batch = x_train[batch_indices]
y_batch = y_train[batch_indices]
y_pred = model.forward(x_batch, train=True)
loss = cross_entropy_loss(y_pred, y_batch)
dout = _softmax_cross_entropy_grad(y_pred, y_batch)
model.backward(dout)
optimizer.update(model.params, model.grads)
epoch_loss_sum += float(loss) * x_batch.shape[0]
epoch_seen += x_batch.shape[0]
epoch_loss = epoch_loss_sum / epoch_seen
loss_history.append(epoch_loss)
if eval_sample_num:
train_idx = rng.choice(num_train, size=eval_sample_num, replace=False)
test_idx = rng.choice(x_test.shape[0], size=eval_sample_num, replace=False)
train_acc = accuracy(model, x_train[train_idx], y_train[train_idx])
test_acc = accuracy(model, x_test[test_idx], y_test[test_idx])
else:
train_acc = accuracy(model, x_train, y_train)
test_acc = accuracy(model, x_test, y_test)
train_acc_history.append(train_acc)
test_acc_history.append(test_acc)
print(
f"epoch={epoch:02d} loss={epoch_loss:.4f} "
f"train_acc={train_acc:.2f} test_acc={test_acc:.2f} "
f"elapsed={time.time() - started:.1f}s",
flush=True,
)
final_test_acc = accuracy(model, x_test, y_test)
return {
"loss_history": loss_history,
"train_acc_history": train_acc_history,
"test_acc_history": test_acc_history,
"final_test_accuracy": final_test_acc,
"elapsed_sec": time.time() - started,
"params": sum(p.size for p in model.params.values()),
}
def accuracy(model, x, y, batch_size=100):
y_pred = model.predict(x, batch_size=batch_size)
return float(np.mean(np.argmax(y_pred, axis=1) == y) * 100)
7. 실행 스크립트
실행 스크립트에서는 MNIST 입력을 (N, 1, 28, 28)로 reshape한다. 이 줄이 빠지면 CNN 계층은 입력을 이미지로 볼 수 없다.
# -*- coding: utf-8 -*-
"""논문 구조 기반 CNN+BN MNIST 실험 실행 스크립트."""
import argparse
import json
import sys
from pathlib import Path
import numpy as np
sys.path.insert(0, str(Path(__file__).resolve().parent / "src"))
from data import load_mnist
from cnn_network import SimpleConvNetBN
from momentum import Momentum
from training_cnn import train_cnn
def main():
parser = argparse.ArgumentParser()
parser.add_argument("--epochs", type=int, default=20)
parser.add_argument("--batch-size", type=int, default=100)
parser.add_argument("--lr", type=float, default=0.1)
parser.add_argument("--momentum", type=float, default=0.9)
parser.add_argument("--seed", type=int, default=42)
parser.add_argument("--eval-sample-num", type=int, default=1000)
parser.add_argument("--train-limit", type=int, default=0)
parser.add_argument("--test-limit", type=int, default=0)
args = parser.parse_args()
np.random.seed(args.seed)
(x_train, y_train), (x_test, y_test) = load_mnist()
x_train = x_train.reshape(-1, 1, 28, 28)
x_test = x_test.reshape(-1, 1, 28, 28)
if args.train_limit:
x_train = x_train[:args.train_limit]
y_train = y_train[:args.train_limit]
if args.test_limit:
x_test = x_test[:args.test_limit]
y_test = y_test[:args.test_limit]
model = SimpleConvNetBN(
input_dim=(1, 28, 28),
conv_param={"filter_num": 30, "filter_size": 5, "pad": 0, "stride": 1},
hidden_size=100,
output_size=10,
weight_init_std=0.01,
)
optimizer = Momentum(lr=args.lr, momentum=args.momentum)
result = train_cnn(
model,
optimizer,
x_train,
y_train,
x_test,
y_test,
epochs=args.epochs,
batch_size=args.batch_size,
eval_sample_num=args.eval_sample_num,
seed=args.seed,
)
result.update(
{
"model": "Conv(30,5x5)-ReLU-MaxPool-Affine(100)-BN-ReLU-Affine(10)-Softmax",
"optimizer": "Momentum",
"lr": args.lr,
"momentum": args.momentum,
"epochs": args.epochs,
"batch_size": args.batch_size,
"seed": args.seed,
"train_size": int(x_train.shape[0]),
"test_size": int(x_test.shape[0]),
}
)
out_dir = Path("results")
out_dir.mkdir(exist_ok=True)
out_file = out_dir / f"cnn_bn_result_e{args.epochs}_seed{args.seed}.json"
out_file.write_text(json.dumps(result, ensure_ascii=False, indent=2), encoding="utf-8")
print(json.dumps(result, ensure_ascii=False, indent=2))
print(f"saved={out_file}")
if __name__ == "__main__":
main()
8. 검증한 결과
{
"loss_history": [
0.12507245302729014,
0.03882406847697987,
0.025119677322721735,
0.017189651093665284,
0.011629340683137241,
0.00833651693714278,
0.006507629267105368,
0.005364056677391568,
0.005077262067019434,
0.002892537238260184
],
"train_acc_history": [
99.1,
98.9,
99.6,
99.8,
100.0,
99.9,
99.8,
100.0,
99.8,
100.0
],
"test_acc_history": [
98.8,
98.4,
98.9,
98.8,
99.3,
99.4,
98.8,
99.4,
98.6,
99.1
],
"final_test_accuracy": 99.09,
"elapsed_sec": 1425.5651092529297,
"params": 434090,
"model": "Conv(30,5x5)-ReLU-MaxPool-Affine(100)-BN-ReLU-Affine(10)-Softmax",
"optimizer": "Momentum",
"lr": 0.1,
"momentum": 0.9,
"epochs": 10,
"batch_size": 100,
"seed": 42,
"train_size": 60000,
"test_size": 10000
}
| 항목 | 값 |
|---|---|
| 기본 MLP 기준 | 98.41% |
| CNN+BN 도전 | 99.09% |
| 파라미터 수 | 434,090 |
| 학습 시간 | 약 1425초 |
9. 이 글에서 기억할 것
CNN+BN 도전은 기본 MLP를 고치는 작업이 아니라, MNIST 이미지를 이미지답게 보기 위해 Convolution, Pooling, Flatten, CNN용 학습 루프를 새로 추가한 독립 실험이다.
- 왜 MNIST 입력을 (N, 1, 28, 28)로 바꾸는가?
- im2col을 쓰면 합성곱 계산이 어떤 행렬 곱으로 바뀌는가?
- Pooling backward에서 왜 arg_max가 필요한가?
- BatchNorm은 CNN의 어느 지점에 들어갔는가?