본문 바로가기
데이터 AI 인사이트 👩🏻‍💻/정리노트

[딥러닝 프로젝트] LSTM 기반 운동 동작 인식: 미디어파이프 활용

by Hayley S 2025. 2. 6.

이 프로젝트는 MediaPipeLSTM(Long Short-Term Memory) 모델을 활용하여 사람의 운동 동작(등, 스쿼트, 옆구리)을 인식하는 시스템을 구축하는 것을 목표로 하였다. 주요 단계는 데이터 수집 및 전처리, 모델 학습, 그리고 실시간 동작 인식으로 구성된다.

MediaPipe → 실시간 관절 좌표 추출
LSTM 모델 → 운동 동작을 시계열 데이터로 학습
OpenCV → 웹캠 영상 처리 및 GUI 구성
NumPy & Pandas → 데이터 전처리 및 저장

코드 및 데이터: https://github.com/haewon1219/lstm-motion-recognition


1. 데이터 수집과 전처리

motion_data.py 스크립트는 OpenCV를 활용하여 웹캠에서 실시간으로 프레임을 받아오고, MediaPipe를 통해 33개의 관절 좌표를 감지하여 데이터를 수집했다. MediaPipe의 Pose 모듈을 사용하여 신체의 33개 주요 관절을 감지하고, 해당 데이터를 NumPy 배열 형식으로 변환하여 저장한다. 이렇게 수집된 데이터는 스쿼트, 백, 사이드 총 3가지 운동 동작에 따라 분류되며, 이후 모델 학습을 위한 데이터셋으로 활용된다. 사용자 운동 인식의 정확도를 높이기 위해 총 5명의 참가자가 각 동작을 20회씩 수행하여, 동작별로 100개의 데이터셋을 확보하였다. 또한, 데이터 수집이 완료된 후 자동으로 .npy 파일로 저장되도록 구현하였다.

수집한 데이터 저장
GUI 화면

데이터 수집 루프
1) 웹캠으로부터 프레임을 읽어온다.
2) 프레임을 RGB로 변환한 후, MediaPipe를 통해 관절 좌표를 추출한다.
3) 추출된 좌표를 리스트에 저장한다.

import cv2
import numpy as np
import mediapipe as mp
import os
import time

# Initialize MediaPipe Pose
mp_pose = mp.solutions.pose
pose = mp_pose.Pose()
mp_drawing = mp.solutions.drawing_utils

# Data storage path
BASE_PATH = 'C:/Users/Admin/Desktop/motion_data'
IMAGE_FOLDER_NAME = "images"
COORD_FOLDER_NAME = "좌표"
os.makedirs(BASE_PATH, exist_ok=True)

# Function to create a new numbered folder for images
def create_new_image_folder(base_path):
    folder_path = os.path.join(base_path, IMAGE_FOLDER_NAME)
    os.makedirs(folder_path, exist_ok=True)
    existing_folders = [int(name) for name in os.listdir(folder_path) if name.isdigit()]
    next_number = max(existing_folders) + 1 if existing_folders else 1
    sub_folder_path = os.path.join(folder_path, str(next_number))
    os.makedirs(sub_folder_path, exist_ok=True)
    return sub_folder_path, next_number

# Function to draw pose landmarks on a black background
def draw_landmarks_on_black(image, pose_landmarks):
    black_image = np.zeros_like(image)
    mp_drawing.draw_landmarks(
        black_image, pose_landmarks, mp_pose.POSE_CONNECTIONS,
        mp_drawing.DrawingSpec(color=(0, 255, 0), thickness=2, circle_radius=3),
        mp_drawing.DrawingSpec(color=(0, 0, 255), thickness=2, circle_radius=2)
    )
    return black_image

# Main function to collect data
def collect_pose_data():
    cap = cv2.VideoCapture(0)
    is_saving = False
    collected_data = []
    image_folder_path, coord_folder_number = None, None
    frame_count = 0

    print("Press 'SPACE' to start/stop saving data. Press 'ESC' to exit.")

    while True:
        ret, frame = cap.read()
        if not ret:
            print("Failed to read from webcam.")
            break

        # Convert frame to RGB for MediaPipe
        image_rgb = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
        results = pose.process(image_rgb)

        # Draw landmarks if available
        if results.pose_landmarks:
            mp_drawing.draw_landmarks(
                frame, results.pose_landmarks, mp_pose.POSE_CONNECTIONS,
                mp_drawing.DrawingSpec(color=(0, 255, 0), thickness=2, circle_radius=3),
                mp_drawing.DrawingSpec(color=(0, 0, 255), thickness=2, circle_radius=2)
            )

            # If saving is active, store data
            if is_saving:
                keypoints = [[lmk.x, lmk.y, lmk.z] for lmk in results.pose_landmarks.landmark]
                collected_data.append(keypoints)

                # Save image to the current folder
                img_file_name = f"frame_{frame_count:03d}.jpg"
                img_file_path = os.path.join(image_folder_path, img_file_name)
                black_background = draw_landmarks_on_black(frame, results.pose_landmarks)
                cv2.imwrite(img_file_path, black_background)
                frame_count += 1

                # Display the number of images saved
                cv2.putText(frame, f"Saved images: {frame_count}", (10, 60), cv2.FONT_HERSHEY_SIMPLEX, 1, (0, 0, 255), 2)

                # Stop saving if 120 images are captured
                if frame_count >= 120:
                    is_saving = False
                    if collected_data:
                        npy_file_name = f"keypoints_{coord_folder_number}.npy"
                        coord_folder_path = os.path.join(BASE_PATH, COORD_FOLDER_NAME)
                        os.makedirs(coord_folder_path, exist_ok=True)
                        npy_file_path = os.path.join(coord_folder_path, npy_file_name)
                        np.save(npy_file_path, np.array(collected_data))
                        print(f"Coordinates saved to: {npy_file_path}")
                    print("Saved 120 images. Stopped saving.")

        # Show FPS in red text
        fps = int(cap.get(cv2.CAP_PROP_FPS))
        cv2.putText(frame, f"FPS: {fps}", (10, 30), cv2.FONT_HERSHEY_SIMPLEX, 1, (0, 0, 255), 2)

        # Display "Recording" if saving
        if is_saving:
            cv2.putText(frame, "Recording", (10, 90), cv2.FONT_HERSHEY_SIMPLEX, 1, (0, 255, 0), 2)

        # Display the frame
        cv2.imshow("Pose Capture", frame)

        # Handle key inputs
        key = cv2.waitKey(1) & 0xFF
        if key == ord(' '):  # Spacebar pressed
            if not is_saving:
                # Start a new cycle: create new folders
                image_folder_path, coord_folder_number = create_new_image_folder(BASE_PATH)
                collected_data = []
                frame_count = 0  # Reset frame count
                print(f"Started saving to: {image_folder_path} and keypoints_{coord_folder_number}")
                is_saving = True

        elif key == 27:  # ESC pressed
            print("Exiting...")
            break

    cap.release()
    cv2.destroyAllWindows()
    pose.close()

# Run the data collection
collect_pose_data()

 

*MediaPipe는 동영상이나 실시간 웹캠 입력에서 사람의 주요 관절 위치(예: 어깨, 엉덩이, 무릎, 발목 등)를 추출한다. 스쿼트, 플랭크, 런지와 같은 운동 동작을 녹화한 데이터를 사용하여 관절의 좌표를 시간에 따라 추출한다.

*데이터 전처리를 위해 관절 좌표 데이터를 시계열 데이터로 변환한다. 자세의 올바름을 판단하기 위해 무릎 각도, 허리 각도, 관절 간 거리 등 추가적인 계산을 통해 특징(feature)을 생성한다.

 


2. 모델 학습

LSTM_modeling.ipynb 에서 수집된 관절 좌표 데이터를 학습데이터로 사용하여 LSTM 모델을 학습시켰다. LSTM은 시계열 데이터 처리를 위한 신경망 구조로, 시간에 따른 관절 좌표의 변화를 학습하여 특정 동작을 인식할 수 있다. 모델은 입력된 관절 좌표 시퀀스를 분석하여 해당 동작이 무엇인지 예측하도록 훈련된다.

먼저, .npy 형식으로 저장된 운동 데이터를 불러오고, 이를 훈련 데이터와 레이블로 분리한다. 데이터를 정규화(표준화)하여 학습의 효율을 높이고, 시계열 형태로 변환하여 LSTM 모델에 입력할 수 있도록 준비한다.

import os
import numpy as np
from sklearn.model_selection import train_test_split
from torch.utils.data import DataLoader, TensorDataset
import torch
import torch.nn as nn
import torch.optim as optim

# 데이터 경로 설정
base_dir = "C:/Users/Admin/Desktop/data"
categories = ["back", "squat", "side"]

def load_data(base_dir, categories, seq_len=90):
    """
    데이터 로드 및 크기 조정 (패딩 또는 잘라내기).
    Args:
        base_dir (str): 데이터가 저장된 기본 경로.
        categories (list): 행동 카테고리 리스트.
        seq_len (int): 고정된 시퀀스 길이.
    Returns:
        np.array: 데이터 배열.
        np.array: 라벨 배열.
    """
    data = []
    labels = []
    label_map = {category: idx for idx, category in enumerate(categories)}  # 라벨 매핑

    for category in categories:
        category_path = os.path.join(base_dir, category, "좌표")
        print(f"Checking files in: {category_path}")
        for i in range(1, 51):  # keypoints_1 ~ keypoints_50
            file_path = os.path.join(category_path, f"keypoints_{i}.npy")
            if os.path.exists(file_path):
                keypoints = np.load(file_path)  # 데이터 로드
                if keypoints.shape[0] > seq_len:  # 길이가 seq_len보다 길면 잘라내기
                    keypoints = keypoints[:seq_len]
                elif keypoints.shape[0] < seq_len:  # 길이가 seq_len보다 짧으면 패딩
                    pad_width = seq_len - keypoints.shape[0]
                    keypoints = np.pad(keypoints, ((0, pad_width), (0, 0), (0, 0)), mode='constant')
                data.append(keypoints)
                labels.append(label_map[category])
            else:
                print(f"File not found: {file_path}")

    return np.array(data), np.array(labels)

# 수정된 데이터 로드 실행
data, labels = load_data(base_dir, categories)
print(f"Data shape: {data.shape}, Labels shape: {labels.shape}")
import os
import numpy as np

# 데이터 경로 및 라벨 매핑
data_paths = {
    "back": "C:/Users/Admin/Desktop/data/back/좌표",
    "squat": "C:/Users/Admin/Desktop/data/squat/좌표",
    "side": "C:/Users/Admin/Desktop/data/side/좌표"
}
labels_map = {"back": 0, "squat": 1, "side": 2}  # 라벨 매핑

def load_data(data_paths, labels_map, seq_len=90):
    """
    데이터 로드 및 크기 조정 (패딩 또는 잘라내기).
    Args:
        data_paths (dict): 카테고리별 데이터 경로.
        labels_map (dict): 카테고리와 라벨 매핑.
        seq_len (int): 고정된 시퀀스 길이.
    Returns:
        np.array: 데이터 배열.
        np.array: 라벨 배열.
    """
    data = []
    labels = []
    for category, path in data_paths.items():
        print(f"Checking files in: {path}")
        for i in range(1, 51):  # keypoints_1 ~ keypoints_50
            file_path = os.path.join(path, f"keypoints_{i}.npy")
            if os.path.exists(file_path):
                keypoints = np.load(file_path)  # 데이터 로드
                if keypoints.shape[0] > seq_len:  # 길이가 seq_len보다 길면 잘라내기
                    keypoints = keypoints[:seq_len]
                elif keypoints.shape[0] < seq_len:  # 길이가 seq_len보다 짧으면 패딩
                    pad_width = seq_len - keypoints.shape[0]
                    keypoints = np.pad(keypoints, ((0, pad_width), (0, 0), (0, 0)), mode='constant')
                data.append(keypoints)
                labels.append(labels_map[category])
            else:
                print(f"File not found: {file_path}")
    return np.array(data), np.array(labels)
import os

for label_name, path in data_paths.items():
    print(f"Checking files in: {path}")
    print(os.listdir(path))

# 데이터 로드
X, y = load_data(data_paths, labels_map)

# 데이터 크기 확인
print("데이터 크기:", X.shape, y.shape)  # Expected output: (30, frames, joints, coords), (30,)
def standardize_data(data):
    """
    데이터 표준화 (평균 0, 표준편차 1).
    Args:
        data (np.array): 원본 데이터, Shape: (samples, frames, joints, coords).
    Returns:
        np.array: 표준화된 데이터.
    """
    mean = data.mean(axis=(1, 2, 3), keepdims=True)  # 샘플별 평균 계산
    std = data.std(axis=(1, 2, 3), keepdims=True)  # 샘플별 표준편차 계산
    standardized_data = (data - mean) / (std + 1e-8)  # 표준화
    return standardized_data

# 데이터 표준화 적용
X_standardized = standardize_data(X)
# 데이터 분할 (훈련/검증/테스트 세트)
X_train, X_temp, y_train, y_temp = train_test_split(X_standardized, y, test_size=0.3, random_state=42, stratify=y)
X_val, X_test, y_val, y_test = train_test_split(X_temp, y_temp, test_size=0.5, random_state=42, stratify=y_temp)

print(f"Train shape: {X_train.shape}, Validation shape: {X_val.shape}, Test shape: {X_test.shape}")

 

LSTM 모델은 연속된 시간 데이터(시계열 데이터)를 학습하여 특정 패턴을 인식하는 신경망 구조를 가지며, 운동 동작이 연속적인 움직임으로 이루어지기 때문에 효과적으로 활용될 수 있다. 여기에서는 입력층, LSTM 계층, Fully Connected(완전 연결) 계층으로 구성된 모델을 사용하여 운동 동작을 분류하도록 설계했다.

# LSTM 모델 정의
class LSTMModel(nn.Module):
    def __init__(self, input_dim, hidden_dim, output_dim, num_layers=3):
        super(LSTMModel, self).__init__()
        self.lstm = nn.LSTM(input_dim, hidden_dim, num_layers, batch_first=True)
        self.fc = nn.Linear(hidden_dim, output_dim)

    def forward(self, x):
        out, _ = self.lstm(x)  # LSTM 출력
        out = self.fc(out[:, -1, :])  # 마지막 타임스텝의 출력만 사용
        return out
    
# 모델 초기화
input_dim = X_train.shape[2] * X_train.shape[3]  # 관절 수 * 좌표 수
hidden_dim = 256
output_dim = len(labels_map)  # 클래스 수 (등, 스쿼트, 옆구리)
model = LSTMModel(input_dim=input_dim, hidden_dim=hidden_dim, output_dim=output_dim)

# 데이터 로더 생성
train_dataset = TensorDataset(torch.tensor(X_train).float(), torch.tensor(y_train).long())
val_dataset = TensorDataset(torch.tensor(X_val).float(), torch.tensor(y_val).long())
train_loader = DataLoader(train_dataset, batch_size=4, shuffle=True)
val_loader = DataLoader(val_dataset, batch_size=4)
from sklearn.utils.class_weight import compute_class_weight

# 손실 함수 및 옵티마이저
criterion = nn.CrossEntropyLoss()
optimizer = optim.Adam(model.parameters(), lr=0.001)

# 손실 함수 가중치 조정
class_weights = compute_class_weight('balanced', classes=np.unique(y_train), y=y_train)
criterion = nn.CrossEntropyLoss(weight=torch.tensor(class_weights, dtype=torch.float))
from torch.optim.lr_scheduler import StepLR
import numpy as np
import torch

# EarlyStopping 클래스 정의
class EarlyStopping:
    def __init__(self, patience=7, min_delta=0):
        self.patience = patience
        self.min_delta = min_delta
        self.counter = 0
        self.best_loss = np.inf
        self.early_stop = False

    def __call__(self, val_loss):
        if self.best_loss - val_loss > self.min_delta:
            self.best_loss = val_loss
            self.counter = 0
        else:
            self.counter += 1
            if self.counter >= self.patience:
                self.early_stop = True

# 학습률 스케줄러 추가
scheduler = StepLR(optimizer, step_size=5, gamma=0.5)

# EarlyStopping 객체 생성
early_stopping = EarlyStopping(patience=7, min_delta=0.0001)

# 학습 루프
num_epochs = 100  # 에포크를 100번으로 설정
for epoch in range(num_epochs):
    model.train()
    train_loss = 0
    for X_batch, y_batch in train_loader:
        optimizer.zero_grad()
        X_batch = X_batch.view(X_batch.size(0), -1, input_dim)
        y_pred = model(X_batch)
        loss = criterion(y_pred, y_batch)
        loss.backward()
        optimizer.step()
        train_loss += loss.item()

    # 학습률 스케줄러 업데이트
    scheduler.step()

    # 검증 단계
    model.eval()
    val_loss = 0
    with torch.no_grad():
        for X_batch, y_batch in val_loader:
            X_batch = X_batch.view(X_batch.size(0), -1, input_dim)
            y_pred = model(X_batch)
            val_loss += criterion(y_pred, y_batch).item()

    print(f"Epoch {epoch + 1}/{num_epochs}, Train Loss: {train_loss / len(train_loader):.4f}, Validation Loss: {val_loss / len(val_loader):.4f}")

    # Early Stopping 확인
    early_stopping(val_loss / len(val_loader))
    if early_stopping.early_stop:
        print("Early stopping triggered. Training stopped.")
        break

78번째에서 멈춤

모델은 일정한 학습 에포크 동안 훈련되며, 손실 함수와 최적화 알고리즘을 적용하여 점진적으로 성능을 개선한다. 학습이 완료된 후, 최적의 가중치를 저장하여 실시간 예측 단계에서 활용할 수 있도록 한다.

print(f"Data min: {X.min()}, Data max: {X.max()}")
print(f"Class distribution: {np.bincount(y)}")
# 테스트 데이터 평가
from sklearn.metrics import classification_report
model.eval()
y_true, y_pred = [], []
test_loader = DataLoader(TensorDataset(torch.tensor(X_test).float(), torch.tensor(y_test).long()), batch_size=4)
with torch.no_grad():
    for X_batch, y_batch in test_loader:
        X_batch = X_batch.view(X_batch.size(0), -1, input_dim)  # 차원 변환
        outputs = model(X_batch)
        _, preds = torch.max(outputs, 1)
        y_true.extend(y_batch.numpy())
        y_pred.extend(preds.numpy())

print(classification_report(y_true, y_pred, target_names=list(labels_map.keys())))

정확도 높음 확인

from sklearn.metrics import classification_report, confusion_matrix
print(confusion_matrix(y_true, y_pred))

정확도 높음 확인

import torch
import torch.nn as nn
import os

# TorchScript로 변환
scripted_model = torch.jit.script(model)

# 경로 설정
output_dir = "C:/Users/Admin/Desktop"
os.makedirs(output_dir, exist_ok=True)  # 디렉토리가 없으면 생성
model_path = os.path.join(output_dir, "lstm_model_scripted.pt")

# TorchScript 모델 저장
scripted_model.save(model_path)
print(f"Scripted model saved to {model_path}")

 


3. 실시간 동작 인식

운동 동작을 실시간으로 감지하고 분석하기 위해 LSTM_gui.py 스크립트를 만들었다. 이 스크립트는 웹캠을 통해 실시간으로 사용자 움직임을 감지하고, 학습된 LSTM 모델을 사용하여 현재 수행 중인 운동 동작을 분류한다.

웹캠에서 받아온 프레임을 MediaPipe Pose 모듈을 활용하여 처리하고, 관절 좌표를 추출한다. 이후, 추출된 좌표 데이터를 모델이 학습한 형식과 동일하게 변환한 후, LSTM 모델에 입력하여 현재 수행 중인 운동 동작이 무엇인지 예측한다.

이 과정에서 사용자가 실행하는 동작이 스쿼트인지, 백인지, 사이드인지를 판단할 수 있으며, 가장 유사한 동작과 정확도를 GUI 화면에 출력한다. 예측 결과를 실시간으로 사용자에게 피드백 형태로 제공할 수 있도록 OpenCV 기반의 GUI를 구현하였으며, 잘못된 자세를 감지하면 교정 메시지를 출력할 수도 있다.

import cv2
import mediapipe as mp
import numpy as np
import torch

# Mediapipe 초기화
mp_pose = mp.solutions.pose
pose = mp_pose.Pose()
mp_drawing = mp.solutions.drawing_utils  # 관절 시각화를 위한 유틸리티

# 웹캠 열기
cap = cv2.VideoCapture(0)

# LSTM 모델 불러오기
model_path = "C:/Users/Admin/Desktop/lstm_model_scripted_2.pt"  # 사전 저장된 LSTM 모델 경로
model = torch.jit.load(model_path)
model.eval()

# 라벨 매핑
labels_map = {0: "Back", 1: "Squat", 2: "Side"}

# 관절 데이터 저장 변수
is_collecting = False  # 관절 추출 상태 플래그
keypoints_list = []  # 관절 데이터 저장

# 표준화 함수
def standardize_data(data):
    """
    데이터 표준화 (평균 0, 표준편차 1).
    Args:
        data (np.array): 원본 데이터, Shape: (samples, frames, joints, coords).
    Returns:
        np.array: 표준화된 데이터.
    """
    mean = data.mean(axis=(1, 2, 3), keepdims=True)  # 샘플별 평균 계산
    std = data.std(axis=(1, 2, 3), keepdims=True)  # 샘플별 표준편차 계산
    standardized_data = (data - mean) / (std + 1e-8)  # 표준화
    return standardized_data

# 실시간 예측 루프
while cap.isOpened():
    ret, frame = cap.read()
    if not ret:
        break

    # Mediapipe로 관절 데이터 추출
    image = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
    result = pose.process(image)

    # 관절 데이터 추출 및 시각화
    if result.pose_landmarks:
        # Mediapipe 유틸리티로 관절 시각화
        mp_drawing.draw_landmarks(
            frame,  
            result.pose_landmarks, 
            mp_pose.POSE_CONNECTIONS,  # 관절 연결선
            mp_drawing.DrawingSpec(color=(0, 255, 0), thickness=2, circle_radius=4),  # 관절점 스타일
            mp_drawing.DrawingSpec(color=(0, 0, 255), thickness=2)  # 연결선 스타일
        )

        # 33개의 모든 관절 좌표 추출 (x, y, z)
        keypoints = np.array([[lm.x, lm.y, lm.z] for lm in result.pose_landmarks.landmark])

        # 관절 추출 상태에서 데이터 저장
        if is_collecting:
            keypoints_list.append(keypoints)

        # 관절 추출 상태 표시
        status_text = "Collecting" if is_collecting else "Paused"
        cv2.putText(frame, f"Status: {status_text}", (10, 30), cv2.FONT_HERSHEY_SIMPLEX, 1, (0, 255, 0), 2)

    # 영상 출력
    cv2.imshow("Live Feed", frame)

    # 키 입력 처리
    key = cv2.waitKey(1) & 0xFF
    if key == ord('q'):  # 'q'를 누르면 종료
        break
    elif key == ord(' '):  # 스페이스바로 상태 토글
        is_collecting = not is_collecting
        if not is_collecting and keypoints_list:  # 관절 추출 종료 시 모델에 데이터 입력
            # 쌓인 관절 데이터를 배열로 변환
            keypoints_array = np.array(keypoints_list)  # (시퀀스 길이, 33, 3)
            
            # 데이터 정규화
            keypoints_standardized = standardize_data(np.expand_dims(keypoints_array, axis=0))[0]  # 정규화 후 원래 형태로 복원

            # 모델 입력 형태 변환 (배치, 시퀀스 길이, 관절 수 * 좌표 수)
            input_data = torch.tensor(keypoints_standardized).float()
            input_data = input_data.view(1, keypoints_standardized.shape[0], -1)  # (1, 시퀀스 길이, 33*3)

            # 모델 예측
            with torch.no_grad():
                outputs = model(input_data)  # 모델 출력: logits
                probabilities = torch.softmax(outputs, dim=1)  # 확률로 변환
                predicted_class = torch.argmax(probabilities, dim=1).item()  # 예측 클래스
                confidence = probabilities[0, predicted_class].item()  # 예측 확률

            # 결과 출력
            predicted_label = labels_map[predicted_class]
            confidence_percent = confidence * 100  # 퍼센트로 변환
            print(f"Predicted: {predicted_label}, Confidence: {confidence_percent:.2f}%")

            # 각 동작별 확률 출력
            print("Probabilities:")
            for i, prob in enumerate(probabilities[0]):
                label = labels_map[i]
                print(f"  {label}: {prob.item() * 100:.2f}%")

            # 저장된 관절 데이터 초기화
            keypoints_list = []

# 종료
cap.release()
cv2.destroyAllWindows()

 

이 프로젝트는 MediaPipe와 LSTM을 결합하여 실시간 운동 동작을 분석하는 시스템을 구축하였다. MediaPipe를 활용하여 실시간으로 사람의 관절 좌표를 추출하고, 이를 시계열 데이터로 변환하여 LSTM 모델이 학습하도록 하였다. 최종적으로, 학습된 모델을 실시간 예측 시스템에 적용하여 운동 동작을 감지하고 사용자에게 피드백을 제공할 수 있도록 설계되었다.


이 프로젝트에 전이학습(Transfer Learning)을 포함하는 방법도 함께 알아보았다. 위 프로젝트는 LSTM을 처음부터 학습하여 운동 동작을 분류하는 방식으로 되어 있다. 그러나 전이학습을 적용하면 학습 속도를 높이고, 더 적은 데이터로도 높은 성능을 낼 수 있다고 한다.

전이학습을 적용하는 가장 적절한 방법은 사전 학습된 포즈 추출 모델 또는 행동 인식 모델을 활용하는 것이다. 이를 통해 특징 추출 부분을 고정하고, 운동 동작을 분류하는 부분만 새롭게 학습하면 효과적이다. 전이학습 방법으로는 아래와 같이 두 가지로 볼 수 있겠다.

✅ LSTM 모델을 처음부터 학습하지 않고, 사전 학습된 LSTM 모델을 불러와 일부 층만 미세 조정(Fine-Tuning)
✅ MediaPipe 대신 OpenPose 또는 MoveNet을 활용하여 더 정밀한 관절 데이터를 생성

(1) 사전 학습된 LSTM 모델을 활용하는 방법

현재 LSTM 모델을 처음부터 학습하는 대신, 기존에 사람의 동작을 학습한 사전 학습된 LSTM 모델을 불러와서 미세 조정(Fine-Tuning) 하면 된다.

  • 예를 들어, Human Activity Recognition (HAR) LSTM 모델처럼 이미 일반적인 행동 인식을 학습한 모델을 가져와 운동 자세 분류에 맞게 추가 학습할 수 있다.
  • HAR LSTM 모델은 걷기, 뛰기, 앉기 등 일반적인 움직임을 학습한 모델이므로, 이를 운동 동작(스쿼트, 런지, 플랭크 등)에 맞게 미세 조정(Fine-Tuning)하면 높은 성능을 얻을 수 있다.

📌 전이 학습 적용 코드 (기존 모델 불러와서 미세 조정하기)

import torch
import torch.nn as nn

# 1. 사전 학습된 LSTM 모델 로드
pretrained_model = torch.load("pretrained_LSTM.pth")  # 기존 HAR 모델 로드
pretrained_model.requires_grad_(False)  # 기존 가중치는 고정

# 2. 출력 레이어만 새로운 운동 동작에 맞게 수정
num_new_classes = 3  # 예: 스쿼트, 런지, 플랭크
pretrained_model.fc = nn.Linear(pretrained_model.fc.in_features, num_new_classes)

# 3. 미세 조정(Fine-Tuning) 학습
optimizer = torch.optim.Adam(pretrained_model.parameters(), lr=0.001)
criterion = nn.CrossEntropyLoss()

 

(2) 포즈 추출 모델(Feature Extractor) 변경

기존 프로젝트에서는 motion_data.py에서 MediaPipe Pose를 사용하여 사람의 관절 데이터를 추출하고 있다. 그러나 더 강력한 사전 학습된 포즈 인식 모델(예: OpenPose, MoveNet)을 활용하면 더 정확한 자세 데이터를 얻을 수 있다.

 

  • MediaPipe는 가볍고 빠르지만, 정확도가 OpenPose보다 낮다.
  • OpenPose는 더 정밀한 3D 관절 데이터를 제공하므로, 더 나은 자세 분석이 가능하다.
  • MoveNet(TensorFlow)는 최신 포즈 추출 모델로, 빠르면서도 정확한 성능을 제공한다.

📌 MediaPipe 대신 OpenPose 사용 예시 (motion_data.py 수정)

import cv2
from openpose import pyopenpose as op

# OpenPose 초기화
params = {"model_folder": "models/"}
openpose = op.WrapperPython()
openpose.configure(params)
openpose.start()

cap = cv2.VideoCapture(0)

while cap.isOpened():
    ret, frame = cap.read()
    if not ret:
        break
    
    # OpenPose로 관절 추출
    datum = op.Datum()
    datum.cvInputData = frame
    openpose.emplaceAndPop([datum])

    # 추출된 관절 좌표 활용
    keypoints = datum.poseKeypoints
    print(keypoints)  # LSTM 입력 데이터로 활용 가능

    cv2.imshow("Pose Estimation", datum.cvOutputData)
    if cv2.waitKey(1) & 0xFF == ord('q'):
        break