Bull Queue?

Bull Queue

BullNode.js에서 사용하는 강력한 작업 큐(Job queue) 라이브러리로, Redis를 기반으로 동작한다. 이를 통해 백그라운드 작업을 효율적으로 관리하거나, 대규모의 비동기 작업을 처리할 수 있다. Bull은 작업을 대기, 진행, 완료, 실패 등 다양한 상태로 관리하며, 분산 환경에서도 안정적으로 동작하도록 설계되었다.

 

주요 기능

  • 작업 상태 관리: 작업을 대기 상태(waiting), 진행 중(active), 완료(completed), 실패(failed)로 관리한다. 각 작업의 상태를 Redis에서 조회할 수 있어 추적 가능이 용이하다.
  • 작업 재시도 및 지연 처리: 작업 실패 시 재시도(retry) 가능. 작업을 특정 시간 뒤에 실행하도록 지연(queue delay) 설정도 가능하다.
  • 작업 스케줄링: 특정 시간 / 간격으로 작업을 예약할 수 있다.
  • 병렬 처리 지원: 여러 작업을 병렬로 실행하여 성능을 높인다.
  • 작업 우선순위: 큐에 추가되는 작업의 우선순위를 지정할 수 있다. (높은 우선순위 작업부터 처리).
  • 속도 제한: 작업 처리 속도를 제한(rate limiting)하여 서버나 외부 API의 부하를 제어한다.
  • 분산 환경 지원: 여러 프로세스나 서버에서 하나의 Bull Queue를 공유하여 처리가 가능하다.

 

bull의 장점을 요약하자면 강력한 백그라운드 작업 처리 능력, Redis를 기반으로 하여 확장성성능 보장, 다양한 고급 기능(재시도, 속도 제한, 스케줄링 등) 내장, 실시간 작업 상태 모니터링 가능 등이 있다. 따라서 bull을 프로젝트에 적용한다면 성능적으로 최적화가 동반된다는 확신이 들어, 이번에는 프로젝트에 bull을 도입하는 시간을 가지겠다!

 

bull 적용하기

Bull Queue는 Redis를 백엔드 데이터 저장소로 사용하므로, 먼저 Redis를 설치하고 세팅해야한다. 다행히도 저번 시간에 말했듯, 이미 Redis Cluster를 적용하며 Redis 관련된 세팅은 다 되어 있으므로 일단 가보자.

 

설정 처리

import Queue from 'bull';
import { REDIS_HOST, REDIS_PORT } from '../../../constants/env.js';

// 큐 인스턴스 저장 MAP
const queues = new Map();

const defaultOptions = {
  redis: {
    host: REDIS_HOST,
    port: REDIS_PORT + 1000,
  },
  // 기본 작업 옵션
  defaultJobOptions: {
    attempts: 3, // 재시도 횟수
    removeOnComplete: 100, // 완료된 작업 100개까지만 보관
    removeOnFail: 100, // 실패한 작업 100개까지만 보관
  },
};

// 큐 생성
const createQueue = (queueName) => {
  if (!queues.has(queueName)) {
    const queue = new Queue(queueName, defaultOptions);

    // Queue 이벤트 리스너
    queue.on('error', (err) => {
      console.error(`Queue ${queueName} 에러: `, err);
    });

    queue.on('waiting', (jobId) => {
      console.log(`Job ${jobId} 대기.`);
    });

    queue.on('active', (job) => {
      console.log(`Job ${job.id} 실행.`);
    });

    queue.on('completed', (job) => {
      console.log(`Job ${job.id} 완료.`);
    });

    queue.on('failed', (job, err) => {
      console.error(`Job ${job.id} 실패: `, err);
    });

    queues.set(queueName, queue);
  }

  return queues.get(queueName);
};

// 큐 불러오기
const getQueue = (queueName) => {
  return queues.get(queueName);
};

// 모든 큐 불러오기
const getAllQueues = () => {
  return Array.from(queues.values());
};

// 큐 제거
const removeQueue = async (queueName) => {
  const queue = queues.get(queueName);
  if (queue) {
    await queue.close();
    queues.delete(queueName);
  }
};

// 모든 큐 종료
const closeAllQueues = async () => {
  for (const queue of queues.values()) {
    // 각 작업 비동기로 실행
    await queue.pause(true);
    await queue.empty();

    // 모든 작업 삭제
    await Promise.all([
      queue.clean(0, 'completed'),
      queue.clean(0, 'failed'),
      queue.clean(0, 'delayed'),
      queue.clean(0, 'wait'),
      queue.clean(0, 'active'),
    ]);

    // 모든 키 삭제
    await queue.obliterate({ force: true });
    await queue.close();
  }

  queues.clear();
  console.log('모든 Bull Queue 정리.');
};

export { createQueue, getQueue, getAllQueues, removeQueue, closeAllQueues };

Redis 관련 Manager 파일을 만들어와서 그런지, bull queue 세팅은 그리 오랜 시간이 필요하지는 않았다. 뭔가 뿌듯하기도 하고... 불안하기도 하고...

 

다음은 bull queue가 저장될 포트를 docker container에 추가해줘야 한다. 나는 터미널로 redis-bull 이라는 이름으로 추가해줬다. 세팅은 이것으로 완료!

 

이 다음은 과연 bull queue를 어디에 적용해볼지 생각하였다. 그런데 지금까지 진행된 프로젝트에 마땅한 사용처가 없네...? 물론 이후에는 동시성 처리라든가 작업 비동기적 관리 등의 장점으로 몬스터 킬 처리, 보스 막타 처리 등등 적용할 요소가 많다. 그리고 그걸 생각하고 bull queue를 도입하자 마음 먹었고... 그런데 아직 완성된 게 없다...

그래서 나는 유저 입장 시에 bull queue를 적용하기로 정했다. 왜냐면 bull queue의 장점 중에는 작업 지연 및 예약이라는  마치 알람 기능과 같은 역할이 있거든. 물론 실시간성이 중요한 경우, bull queue와는 어울리지 않는다. 매우 짧은 시간이라도 프로세스가 하나 더 생기는 과정이고 비동기적으로 수행하기 때문이다. 하지만 우리 프로젝트의 경우, 많은 유저를 수용할 생각도 없고 몸으로 체감될 지연이 있지도 않을 거라 판단. 따라서 유저가 입장하고 몇 초 뒤, 모든 유저에게 Notification으로 유저 입장 알림을 전송해보자! 라는 생각으로 bull queue를 적용했다.

 

작업 추가 로직

const addEnterJob = async (socket) => {
  // job 반환 => finished() 메서드 사용 가능
  return await enterQueue.add(
    { socketId: socket.id },
    {
      attempts: 3, // 재시도 횟수
      timeout: 5000, // 타임아웃
    },
  );
};

말그대로 작업을 추가해준다. bull queue는 크게 3가지 로직으로 처리되는데, 그 중 첫 번째인 작업 추가. return 값으로 Job을 반환하는 이유는 주석에서 볼 수 있는 것처럼 finished() 메서드를 사용하기 위해서다.

 

Worker 처리 코드

enterQueue.process(async (job) => {
  const { socketId } = job.data;

  const userRedis = await getRedisUserById(socketId);
  const userSession = getUserSessionById(socketId);

  const playerPayload = {
    playerId: parseInt(socketId),
    nickname: userRedis.nickname,
    class: userRedis.myClass,
    transform: getUserTransformById(userRedis.id),
  };

  // 접속한 유저에게 응답
  const response = createResponse(PACKET_ID.S_Enter, { player: playerPayload });
  userSession.socket.write(response);

  const users = await getRedisUsers();
  const userSessions = getUserSessions();

  // 해당 유저에게는 다른 유저 정보를 S_Spawn으로 전달.
  const otherUserPayload = {
    players: users
      .filter((player) => player.id !== socketId)
      .map((player) => ({
        playerId: parseInt(player.id),
        nickname: player.nickname,
        class: player.myClass,
        transform: getUserTransformById(player.id),
      })),
  };

  if (otherUserPayload.players.length > 0) {
    const notification = createNotificationPacket(PACKET_ID.S_Spawn, otherUserPayload);
    userSession.socket.write(notification);
  }

  const userNotification = createNotificationPacket(PACKET_ID.S_Spawn, {
    players: [playerPayload],
  });

  userSessions.forEach((u) => {
    if (u.socket.id !== socketId) {
      u.socket.write(userNotification);
    }
  });

  // Sleep 함수 - 1초 대기 후 실행
  const sleep = (ms) => new Promise((resolve) => setTimeout(resolve, ms));
  await sleep(1000);

  userSessions.forEach((u) => {
    const chatPayload = {
      playerId: userRedis.id,
      chatMsg: `${userRedis.nickname}님이 게임에 입장하셨습니다!`,
    };

    u.socket.write(createResponse(PACKET_ID.S_Chat, chatPayload));
  });

  return { success: true };
});

기존 코드의 로직을 Job Process에 넣었다. 따라서 해당 코드는 비동기적으로 관리될 것이고, Sleep함수를 이용하여 1초 뒤에 입장 알람이 전송되게끔 로직을 작성했다.

 

오늘은 순조롭게 bull queue 적용을 완성할 수 있었다. 앞서 Redis 관련 작업을 해와서 그런지 그리 어려운 점은 없었다. 다만 bull queue가 적용되면 좋을 부분과 그리 장점이 되지 않을 부분을 구분하기가 까다로웠다. 아직 bull queue를 능란하게 사용한 적이 없어 그렇겠지... 앞으로 자주 사용해서 열심히 공부해보자!

 

 

 

'Side Projects' 카테고리의 다른 글

Fluent Validation? Bcrypt?  (0) 2024.12.08
redis cluster 사용해보기  (1) 2024.11.27
redis / ioredis 패키지 비교  (1) 2024.11.18
TCP Multi-Player - 트러블 슈팅  (0) 2024.11.05
ORM / Low-Level Query  (0) 2024.10.29