[Daily morning study] Redis Pub/Sub๊ณผ Stream ํ™œ์šฉ

#daily morning study

Image


Redis Pub/Sub๊ณผ Stream ํ™œ์šฉ

Pub/Sub์ด๋ž€

Pub/Sub(Publish/Subscribe)์€ ๋ฉ”์‹œ์ง€๋ฅผ ๋ฐœํ–‰(publish)ํ•˜๋Š” ์ชฝ๊ณผ ๊ตฌ๋…(subscribe)ํ•˜๋Š” ์ชฝ์„ ๋ถ„๋ฆฌํ•˜๋Š” ๋ฉ”์‹œ์ง• ํŒจํ„ด์ด๋‹ค.

  • Publisher: ํŠน์ • ์ฑ„๋„์— ๋ฉ”์‹œ์ง€๋ฅผ ๋ณด๋‚ธ๋‹ค
  • Subscriber: ์ฑ„๋„์„ ๊ตฌ๋…ํ•˜๊ณ , ๋ฉ”์‹œ์ง€๊ฐ€ ์˜ค๋ฉด ์ฆ‰์‹œ ๋ฐ›๋Š”๋‹ค
  • Redis๊ฐ€ ์ค‘๊ฐ„ ๋ธŒ๋กœ์ปค ์—ญํ• ์„ ํ•œ๋‹ค
Publisher โ†’ [Redis Channel] โ†’ Subscriber 1
                           โ†’ Subscriber 2
                           โ†’ Subscriber N

Redis Pub/Sub ๊ธฐ๋ณธ ๋ช…๋ น์–ด

# ์ฑ„๋„ ๊ตฌ๋…
SUBSCRIBE chat-room

# ํŒจํ„ด์œผ๋กœ ๊ตฌ๋… (glob ํŒจํ„ด ์ง€์›)
PSUBSCRIBE chat-*

# ๋ฉ”์‹œ์ง€ ๋ฐœํ–‰
PUBLISH chat-room "์•ˆ๋…•ํ•˜์„ธ์š”"

# ํ˜„์žฌ ํ™œ์„ฑ ์ฑ„๋„ ๋ชฉ๋ก
PUBSUB CHANNELS

# ์ฑ„๋„์˜ ๊ตฌ๋…์ž ์ˆ˜
PUBSUB NUMSUB chat-room

Pub/Sub์˜ ํ•œ๊ณ„

Redis Pub/Sub์€ fire-and-forget ๋ฐฉ์‹์ด๋‹ค. ์ด ๋•Œ๋ฌธ์— ๋ช‡ ๊ฐ€์ง€ ์ œ์•ฝ์ด ์žˆ๋‹ค.

ํ•œ๊ณ„์„ค๋ช…
๋ฉ”์‹œ์ง€ ์œ ์‹ค๊ตฌ๋…์ž๊ฐ€ ์˜คํ”„๋ผ์ธ์ผ ๋•Œ ๋ฐœํ–‰๋œ ๋ฉ”์‹œ์ง€๋Š” ์‚ฌ๋ผ์ง„๋‹ค
์žฌ์ „์†ก ๋ถˆ๊ฐ€ํ•œ ๋ฒˆ ๋ฐœํ–‰๋œ ๋ฉ”์‹œ์ง€๋ฅผ ๋‹ค์‹œ ๋ฐ›์„ ์ˆ˜ ์—†๋‹ค
์†Œ๋น„ ํ™•์ธ ์—†์Œ๊ตฌ๋…์ž๊ฐ€ ์‹ค์ œ๋กœ ์ฒ˜๋ฆฌํ–ˆ๋Š”์ง€ ์•Œ ์ˆ˜ ์—†๋‹ค
๋ฉ”์‹œ์ง€ ์˜์†์„ฑ ์—†์ŒRedis ์žฌ์‹œ์ž‘ ์‹œ ๋ชจ๋‘ ์†Œ์‹ค๋œ๋‹ค

์ด๋Ÿฌํ•œ ํ•œ๊ณ„๋ฅผ ๊ทน๋ณตํ•˜๊ธฐ ์œ„ํ•ด ๋‚˜์˜จ ๊ฒƒ์ด Redis Stream์ด๋‹ค.

Redis Stream์ด๋ž€

Redis 5.0์—์„œ ์ถ”๊ฐ€๋œ ์ž๋ฃŒ๊ตฌ์กฐ๋กœ, Kafka์™€ ์œ ์‚ฌํ•œ append-only log ๋ฐฉ์‹์˜ ๋ฉ”์‹œ์ง€ ์ŠคํŠธ๋ฆผ์ด๋‹ค.

  • ๋ฉ”์‹œ์ง€๊ฐ€ ๋””์Šคํฌ์— ์˜์†์ ์œผ๋กœ ์ €์žฅ๋œ๋‹ค
  • ์†Œ๋น„์ž ๊ทธ๋ฃน(Consumer Group)์„ ํ†ตํ•ด ๋ฉ”์‹œ์ง€ ์ฒ˜๋ฆฌ ์ƒํƒœ๋ฅผ ์ถ”์ ํ•  ์ˆ˜ ์žˆ๋‹ค
  • ์†Œ๋น„์ž๊ฐ€ ์˜คํ”„๋ผ์ธ์ด์—ˆ๋‹ค๊ฐ€ ๋‹ค์‹œ ์—ฐ๊ฒฐํ•ด๋„ ๋†“์นœ ๋ฉ”์‹œ์ง€๋ฅผ ์ฝ์„ ์ˆ˜ ์žˆ๋‹ค

Stream ๊ธฐ๋ณธ ๋ช…๋ น์–ด

# ๋ฉ”์‹œ์ง€ ์ถ”๊ฐ€ (ID๋Š” ์ž๋™ ์ƒ์„ฑ: ํƒ€์ž„์Šคํƒฌํ”„-์‹œํ€€์Šค)
XADD orders * product "laptop" price "1200"
# ๊ฒฐ๊ณผ: "1716710400000-0"

# ๋ฒ”์œ„ ์กฐํšŒ (์ฒ˜์Œ๋ถ€ํ„ฐ ๋๊นŒ์ง€)
XRANGE orders - +

# ์ตœ์‹  ๋ฉ”์‹œ์ง€๋ถ€ํ„ฐ ์—ญ์ˆœ ์กฐํšŒ
XREVRANGE orders + - COUNT 5

# ์ŠคํŠธ๋ฆผ ๊ธธ์ด
XLEN orders

# ํŠน์ • ID ์ดํ›„ ๋ฉ”์‹œ์ง€ ์ฝ๊ธฐ (์ƒˆ ๋ฉ”์‹œ์ง€ ๋Œ€๊ธฐ, ๋ธ”๋กœํ‚น)
XREAD COUNT 10 BLOCK 0 STREAMS orders 1716710400000-0

Consumer Group

์—ฌ๋Ÿฌ ์†Œ๋น„์ž๊ฐ€ ๊ฐ™์€ ์ŠคํŠธ๋ฆผ์„ ๋ถ„์‚ฐ ์ฒ˜๋ฆฌํ•  ๋•Œ ์‚ฌ์šฉํ•œ๋‹ค. ๊ฐ ๋ฉ”์‹œ์ง€๋Š” ๊ทธ๋ฃน ๋‚ด ํ•˜๋‚˜์˜ ์†Œ๋น„์ž์—๊ฒŒ๋งŒ ์ „๋‹ฌ๋œ๋‹ค.

# Consumer Group ์ƒ์„ฑ (0๋ถ€ํ„ฐ ์‹œ์ž‘ = ์ฒ˜์Œ๋ถ€ํ„ฐ)
XGROUP CREATE orders order-group $ MKSTREAM

# Consumer Group์œผ๋กœ ๋ฉ”์‹œ์ง€ ์ฝ๊ธฐ
XREADGROUP GROUP order-group consumer-1 COUNT 5 STREAMS orders >

# ๋ฉ”์‹œ์ง€ ์ฒ˜๋ฆฌ ์™„๋ฃŒ ํ™•์ธ (ACK)
XACK orders order-group 1716710400000-0

# ์ฒ˜๋ฆฌ ์ค‘์ธ(ACK ์•ˆ ๋œ) ๋ฉ”์‹œ์ง€ ๋ชฉ๋ก
XPENDING orders order-group - + 10

> ๊ธฐํ˜ธ๋Š” ์•„์ง ์•„๋ฌด ์†Œ๋น„์ž์—๊ฒŒ๋„ ์ „๋‹ฌ๋˜์ง€ ์•Š์€ ์ƒˆ ๋ฉ”์‹œ์ง€๋ฅผ ์˜๋ฏธํ•œ๋‹ค.

Pub/Sub vs Stream ๋น„๊ต

ํ•ญ๋ชฉPub/SubStream
๋ฉ”์‹œ์ง€ ์œ ์ง€X (์ „๋‹ฌ ํ›„ ์‚ฌ๋ผ์ง)O (append-only ๋กœ๊ทธ)
์˜คํ”„๋ผ์ธ ์†Œ๋น„์ž๋ฉ”์‹œ์ง€ ์œ ์‹ค์žฌ์—ฐ๊ฒฐ ํ›„ ์žฌ์ฒ˜๋ฆฌ ๊ฐ€๋Šฅ
์†Œ๋น„์ž ๊ทธ๋ฃนXO
ACK ์ง€์›XO
์žฌ์ƒ(Replay)XO
์‚ฌ์šฉ ์‚ฌ๋ก€์‹ค์‹œ๊ฐ„ ์•Œ๋ฆผ, ์ฑ„ํŒ…์ด๋ฒคํŠธ ๋กœ๊ทธ, ์ฃผ๋ฌธ ์ฒ˜๋ฆฌ

Stream ํŠธ๋ฆฌ๋ฐ

Stream์€ ๊ณ„์† ์Œ“์ด๋ฏ€๋กœ ํฌ๊ธฐ๋ฅผ ์ œํ•œํ•ด์•ผ ํ•  ๋•Œ๊ฐ€ ์žˆ๋‹ค.

# ์ตœ๋Œ€ 1000๊ฐœ ์œ ์ง€ (์ดˆ๊ณผ๋ถ„ ์ž๋™ ์‚ญ์ œ)
XADD orders MAXLEN 1000 * product "mouse" price "30"

# ๋ช…์‹œ์  ํŠธ๋ฆฌ๋ฐ
XTRIM orders MAXLEN 1000

MAXLEN ~ (ํ‹ธ๋‹ค ์‚ฌ์šฉ)์œผ๋กœ ๊ทผ์‚ฌ๊ฐ’ ํŠธ๋ฆฌ๋ฐ์„ ํ•˜๋ฉด ์„ฑ๋Šฅ์ด ๋” ์ข‹๋‹ค.

XADD orders MAXLEN ~ 1000 * product "keyboard" price "80"

์‹ค์‚ฌ์šฉ ์‹œ๋‚˜๋ฆฌ์˜ค

์‹ค์‹œ๊ฐ„ ์ฑ„ํŒ… โ†’ Pub/Sub ์ ํ•ฉ

import redis

r = redis.Redis()

# ์ฑ„ํŒ… ์„œ๋ฒ„
def send_message(channel, message):
    r.publish(channel, message)

# ์ฑ„ํŒ… ํด๋ผ์ด์–ธํŠธ
def listen(channel):
    pubsub = r.pubsub()
    pubsub.subscribe(channel)
    for message in pubsub.listen():
        if message['type'] == 'message':
            print(message['data'])

์ฃผ๋ฌธ ์ด๋ฒคํŠธ ์ฒ˜๋ฆฌ โ†’ Stream ์ ํ•ฉ

import redis

r = redis.Redis()

# ์ฃผ๋ฌธ ๋ฐœํ–‰
def place_order(product, price):
    r.xadd('orders', {'product': product, 'price': str(price)})

# ์ฃผ๋ฌธ ์ฒ˜๋ฆฌ ์›Œ์ปค
def process_orders():
    r.xgroup_create('orders', 'order-group', '$', mkstream=True)
    while True:
        messages = r.xreadgroup('order-group', 'worker-1', {'orders': '>'}, count=10)
        for stream, msgs in messages:
            for msg_id, data in msgs:
                print(f"์ฒ˜๋ฆฌ ์ค‘: {data}")
                # ์ฒ˜๋ฆฌ ์™„๋ฃŒ ์‹œ ACK
                r.xack('orders', 'order-group', msg_id)

์ •๋ฆฌ

  • Pub/Sub: ๋น ๋ฅด๊ณ  ๋‹จ์ˆœํ•˜์ง€๋งŒ ๋ฉ”์‹œ์ง€๊ฐ€ ํœ˜๋ฐœ๋œ๋‹ค. ์‹ค์‹œ๊ฐ„ ์•Œ๋ฆผ์ด๋‚˜ ์บ์‹œ ๋ฌดํšจํ™” ๋ธŒ๋กœ๋“œ์บ์ŠคํŠธ์— ์ ํ•ฉ
  • Stream: ๋ฉ”์‹œ์ง€ ์˜์†์„ฑ๊ณผ ์†Œ๋น„์ž ๊ทธ๋ฃน์„ ์ง€์›ํ•ด Kafka ๊ฐ™์€ ์‚ฌ์šฉ ์‚ฌ๋ก€์— ์ ํ•ฉํ•˜์ง€๋งŒ, Kafka๋งŒํผ ํ™•์žฅ์„ฑ์ด ๋†’์ง€๋Š” ์•Š๋‹ค
  • ๋ฉ”์‹œ์ง€ ์œ ์‹ค์ด ํ—ˆ์šฉ๋˜์ง€ ์•Š๋Š” ์„œ๋น„์Šค๋ผ๋ฉด Pub/Sub ๋Œ€์‹  Stream์„ ์“ฐ๋Š” ๊ฒŒ ๋งž๋‹ค