-
Notifications
You must be signed in to change notification settings - Fork 2
Expand file tree
/
Copy pathfilling.py
More file actions
53 lines (41 loc) · 1.57 KB
/
filling.py
File metadata and controls
53 lines (41 loc) · 1.57 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
from kafka import KafkaProducer
import json
import time
# ✅ Kafka 설정
KAFKA_BROKER = "localhost:9092" # Kafka 브로커 주소
TOPIC = "filling-notice" # ✅ 공시 알림용 Kafka 토픽
RETRY_DELAY = 5 # ✅ 재시도 대기 시간 (초)
def create_kafka_producer():
"""📌 Kafka Producer 생성 (무한 재시도)"""
while True:
try:
producer = KafkaProducer(
bootstrap_servers=[KAFKA_BROKER],
value_serializer=lambda v: json.dumps(v).encode('utf-8')
)
print("✅ Kafka Producer 연결 성공!")
return producer
except Exception as e:
print(f"❌ Kafka Producer 연결 실패: {e}")
print(f"⏳ {RETRY_DELAY}초 후 다시 시도...")
time.sleep(RETRY_DELAY)
# ✅ Kafka Producer 생성 (무한 재시도)
producer = create_kafka_producer()
def send_kafka_notification(ticker, filling_type):
"""📩 Kafka로 종목과 공시 유형 전송 (Producer) - 무한 재시도 기능 포함"""
message = {
"ticker": ticker,
"filling_type": filling_type
}
while True:
try:
producer.send(TOPIC, message)
producer.flush()
print(f"📩 Kafka 메시지 전송 완료: {message}")
return # ✅ 성공하면 함수 종료
except Exception as e:
print(f"❌ Kafka 메시지 전송 실패: {e}")
print(f"⏳ {RETRY_DELAY}초 후 다시 시도...")
time.sleep(RETRY_DELAY)
# ✅ 예제 실행
send_kafka_notification("AAPL", "8-K")