[Daily morning study] Redis Pub/Sub๊ณผ Stream ํ์ฉ
#daily morning study
Redis Pub/Sub๊ณผ Stream ํ์ฉ
Pub/Sub์ด๋
Pub/Sub(Publish/Subscribe)์ ๋ฉ์์ง๋ฅผ ๋ฐํ(publish)ํ๋ ์ชฝ๊ณผ ๊ตฌ๋ (subscribe)ํ๋ ์ชฝ์ ๋ถ๋ฆฌํ๋ ๋ฉ์์ง ํจํด์ด๋ค.
- Publisher: ํน์ ์ฑ๋์ ๋ฉ์์ง๋ฅผ ๋ณด๋ธ๋ค
- Subscriber: ์ฑ๋์ ๊ตฌ๋ ํ๊ณ , ๋ฉ์์ง๊ฐ ์ค๋ฉด ์ฆ์ ๋ฐ๋๋ค
- Redis๊ฐ ์ค๊ฐ ๋ธ๋ก์ปค ์ญํ ์ ํ๋ค
Publisher โ [Redis Channel] โ Subscriber 1
โ Subscriber 2
โ Subscriber N
Redis Pub/Sub ๊ธฐ๋ณธ ๋ช ๋ น์ด
# ์ฑ๋ ๊ตฌ๋
SUBSCRIBE chat-room
# ํจํด์ผ๋ก ๊ตฌ๋
(glob ํจํด ์ง์)
PSUBSCRIBE chat-*
# ๋ฉ์์ง ๋ฐํ
PUBLISH chat-room "์๋
ํ์ธ์"
# ํ์ฌ ํ์ฑ ์ฑ๋ ๋ชฉ๋ก
PUBSUB CHANNELS
# ์ฑ๋์ ๊ตฌ๋
์ ์
PUBSUB NUMSUB chat-room
Pub/Sub์ ํ๊ณ
Redis Pub/Sub์ fire-and-forget ๋ฐฉ์์ด๋ค. ์ด ๋๋ฌธ์ ๋ช ๊ฐ์ง ์ ์ฝ์ด ์๋ค.
| ํ๊ณ | ์ค๋ช |
|---|---|
| ๋ฉ์์ง ์ ์ค | ๊ตฌ๋ ์๊ฐ ์คํ๋ผ์ธ์ผ ๋ ๋ฐํ๋ ๋ฉ์์ง๋ ์ฌ๋ผ์ง๋ค |
| ์ฌ์ ์ก ๋ถ๊ฐ | ํ ๋ฒ ๋ฐํ๋ ๋ฉ์์ง๋ฅผ ๋ค์ ๋ฐ์ ์ ์๋ค |
| ์๋น ํ์ธ ์์ | ๊ตฌ๋ ์๊ฐ ์ค์ ๋ก ์ฒ๋ฆฌํ๋์ง ์ ์ ์๋ค |
| ๋ฉ์์ง ์์์ฑ ์์ | Redis ์ฌ์์ ์ ๋ชจ๋ ์์ค๋๋ค |
์ด๋ฌํ ํ๊ณ๋ฅผ ๊ทน๋ณตํ๊ธฐ ์ํด ๋์จ ๊ฒ์ด Redis Stream์ด๋ค.
Redis Stream์ด๋
Redis 5.0์์ ์ถ๊ฐ๋ ์๋ฃ๊ตฌ์กฐ๋ก, Kafka์ ์ ์ฌํ append-only log ๋ฐฉ์์ ๋ฉ์์ง ์คํธ๋ฆผ์ด๋ค.
- ๋ฉ์์ง๊ฐ ๋์คํฌ์ ์์์ ์ผ๋ก ์ ์ฅ๋๋ค
- ์๋น์ ๊ทธ๋ฃน(Consumer Group)์ ํตํด ๋ฉ์์ง ์ฒ๋ฆฌ ์ํ๋ฅผ ์ถ์ ํ ์ ์๋ค
- ์๋น์๊ฐ ์คํ๋ผ์ธ์ด์๋ค๊ฐ ๋ค์ ์ฐ๊ฒฐํด๋ ๋์น ๋ฉ์์ง๋ฅผ ์ฝ์ ์ ์๋ค
Stream ๊ธฐ๋ณธ ๋ช ๋ น์ด
# ๋ฉ์์ง ์ถ๊ฐ (ID๋ ์๋ ์์ฑ: ํ์์คํฌํ-์ํ์ค)
XADD orders * product "laptop" price "1200"
# ๊ฒฐ๊ณผ: "1716710400000-0"
# ๋ฒ์ ์กฐํ (์ฒ์๋ถํฐ ๋๊น์ง)
XRANGE orders - +
# ์ต์ ๋ฉ์์ง๋ถํฐ ์ญ์ ์กฐํ
XREVRANGE orders + - COUNT 5
# ์คํธ๋ฆผ ๊ธธ์ด
XLEN orders
# ํน์ ID ์ดํ ๋ฉ์์ง ์ฝ๊ธฐ (์ ๋ฉ์์ง ๋๊ธฐ, ๋ธ๋กํน)
XREAD COUNT 10 BLOCK 0 STREAMS orders 1716710400000-0
Consumer Group
์ฌ๋ฌ ์๋น์๊ฐ ๊ฐ์ ์คํธ๋ฆผ์ ๋ถ์ฐ ์ฒ๋ฆฌํ ๋ ์ฌ์ฉํ๋ค. ๊ฐ ๋ฉ์์ง๋ ๊ทธ๋ฃน ๋ด ํ๋์ ์๋น์์๊ฒ๋ง ์ ๋ฌ๋๋ค.
# Consumer Group ์์ฑ (0๋ถํฐ ์์ = ์ฒ์๋ถํฐ)
XGROUP CREATE orders order-group $ MKSTREAM
# Consumer Group์ผ๋ก ๋ฉ์์ง ์ฝ๊ธฐ
XREADGROUP GROUP order-group consumer-1 COUNT 5 STREAMS orders >
# ๋ฉ์์ง ์ฒ๋ฆฌ ์๋ฃ ํ์ธ (ACK)
XACK orders order-group 1716710400000-0
# ์ฒ๋ฆฌ ์ค์ธ(ACK ์ ๋) ๋ฉ์์ง ๋ชฉ๋ก
XPENDING orders order-group - + 10
> ๊ธฐํธ๋ ์์ง ์๋ฌด ์๋น์์๊ฒ๋ ์ ๋ฌ๋์ง ์์ ์ ๋ฉ์์ง๋ฅผ ์๋ฏธํ๋ค.
Pub/Sub vs Stream ๋น๊ต
| ํญ๋ชฉ | Pub/Sub | Stream |
|---|---|---|
| ๋ฉ์์ง ์ ์ง | X (์ ๋ฌ ํ ์ฌ๋ผ์ง) | O (append-only ๋ก๊ทธ) |
| ์คํ๋ผ์ธ ์๋น์ | ๋ฉ์์ง ์ ์ค | ์ฌ์ฐ๊ฒฐ ํ ์ฌ์ฒ๋ฆฌ ๊ฐ๋ฅ |
| ์๋น์ ๊ทธ๋ฃน | X | O |
| ACK ์ง์ | X | O |
| ์ฌ์(Replay) | X | O |
| ์ฌ์ฉ ์ฌ๋ก | ์ค์๊ฐ ์๋ฆผ, ์ฑํ | ์ด๋ฒคํธ ๋ก๊ทธ, ์ฃผ๋ฌธ ์ฒ๋ฆฌ |
Stream ํธ๋ฆฌ๋ฐ
Stream์ ๊ณ์ ์์ด๋ฏ๋ก ํฌ๊ธฐ๋ฅผ ์ ํํด์ผ ํ ๋๊ฐ ์๋ค.
# ์ต๋ 1000๊ฐ ์ ์ง (์ด๊ณผ๋ถ ์๋ ์ญ์ )
XADD orders MAXLEN 1000 * product "mouse" price "30"
# ๋ช
์์ ํธ๋ฆฌ๋ฐ
XTRIM orders MAXLEN 1000
MAXLEN ~ (ํธ๋ค ์ฌ์ฉ)์ผ๋ก ๊ทผ์ฌ๊ฐ ํธ๋ฆฌ๋ฐ์ ํ๋ฉด ์ฑ๋ฅ์ด ๋ ์ข๋ค.
XADD orders MAXLEN ~ 1000 * product "keyboard" price "80"
์ค์ฌ์ฉ ์๋๋ฆฌ์ค
์ค์๊ฐ ์ฑํ โ Pub/Sub ์ ํฉ
import redis
r = redis.Redis()
# ์ฑํ
์๋ฒ
def send_message(channel, message):
r.publish(channel, message)
# ์ฑํ
ํด๋ผ์ด์ธํธ
def listen(channel):
pubsub = r.pubsub()
pubsub.subscribe(channel)
for message in pubsub.listen():
if message['type'] == 'message':
print(message['data'])
์ฃผ๋ฌธ ์ด๋ฒคํธ ์ฒ๋ฆฌ โ Stream ์ ํฉ
import redis
r = redis.Redis()
# ์ฃผ๋ฌธ ๋ฐํ
def place_order(product, price):
r.xadd('orders', {'product': product, 'price': str(price)})
# ์ฃผ๋ฌธ ์ฒ๋ฆฌ ์์ปค
def process_orders():
r.xgroup_create('orders', 'order-group', '$', mkstream=True)
while True:
messages = r.xreadgroup('order-group', 'worker-1', {'orders': '>'}, count=10)
for stream, msgs in messages:
for msg_id, data in msgs:
print(f"์ฒ๋ฆฌ ์ค: {data}")
# ์ฒ๋ฆฌ ์๋ฃ ์ ACK
r.xack('orders', 'order-group', msg_id)
์ ๋ฆฌ
- Pub/Sub: ๋น ๋ฅด๊ณ ๋จ์ํ์ง๋ง ๋ฉ์์ง๊ฐ ํ๋ฐ๋๋ค. ์ค์๊ฐ ์๋ฆผ์ด๋ ์บ์ ๋ฌดํจํ ๋ธ๋ก๋์บ์คํธ์ ์ ํฉ
- Stream: ๋ฉ์์ง ์์์ฑ๊ณผ ์๋น์ ๊ทธ๋ฃน์ ์ง์ํด Kafka ๊ฐ์ ์ฌ์ฉ ์ฌ๋ก์ ์ ํฉํ์ง๋ง, Kafka๋งํผ ํ์ฅ์ฑ์ด ๋์ง๋ ์๋ค
- ๋ฉ์์ง ์ ์ค์ด ํ์ฉ๋์ง ์๋ ์๋น์ค๋ผ๋ฉด Pub/Sub ๋์ Stream์ ์ฐ๋ ๊ฒ ๋ง๋ค