67 lines
2.2 KiB
Python
67 lines
2.2 KiB
Python
import json
|
||
import pika
|
||
import os
|
||
from dotenv import load_dotenv
|
||
|
||
load_dotenv()
|
||
|
||
# --- КОНФИГУРАЦИЯ ---
|
||
RABBITMQ_URL = os.getenv("RABBITMQ_URL", "amqp://guest:guest@localhost/")
|
||
EXCHANGE_NAME = os.environ.get("RABBITMQ_EXCHANGE", "vstu_schedule")
|
||
|
||
# Параметры фейкового события
|
||
FACULTET = "fevt"
|
||
IS_NEW = False
|
||
|
||
# Формируем структуру состояния (state), которую ожидает консьюмер
|
||
mock_state = {
|
||
"excel": {
|
||
"url": "https://www.vstu.ru/upload/raspisanie/z/ОН_ФЭВТ_TEST.xls",
|
||
"sha1hash": "test_hash_8a7b6c5d4e3f2a1",
|
||
"last_changed": "2026-04-05 10:00:00",
|
||
"uniqpath": "vstu.ru/rasp?dep=fevt/TEST.xls",
|
||
# Важно: этот файл должен реально существовать на сервере fazziclay.com/api/v1/.../parsed/
|
||
# Иначе Летописец не сможет скачать JSON
|
||
"json_represent": "он_фэвт_2 курс.json",
|
||
"facultet": FACULTET
|
||
},
|
||
"sheets": {},
|
||
"actual_at": 1774475864
|
||
}
|
||
|
||
mock_payload = {
|
||
"type": "excel_file_parsed_not_same",
|
||
"is_new": IS_NEW,
|
||
"state": mock_state
|
||
}
|
||
|
||
def send_test_event():
|
||
try:
|
||
print(f"[*] Connecting to RabbitMQ: {RABBITMQ_URL}")
|
||
connection = pika.BlockingConnection(pika.URLParameters(RABBITMQ_URL))
|
||
channel = connection.channel()
|
||
|
||
# Гарантируем, что обменник существует
|
||
channel.exchange_declare(exchange=EXCHANGE_NAME, exchange_type='topic', durable=True)
|
||
|
||
routing_key = f"parser.excel_parsed.{FACULTET}"
|
||
|
||
print(f"[*] Publishing mock event to '{routing_key}'...")
|
||
channel.basic_publish(
|
||
exchange=EXCHANGE_NAME,
|
||
routing_key=routing_key,
|
||
properties=pika.BasicProperties(
|
||
content_type="application/json",
|
||
delivery_mode=2 # Persistent
|
||
),
|
||
body=json.dumps(mock_payload, ensure_ascii=False).encode('utf-8')
|
||
)
|
||
|
||
print("[+] Test event published successfully!")
|
||
connection.close()
|
||
|
||
except Exception as e:
|
||
print(f"[!] Failed to publish test event: {e}")
|
||
|
||
if __name__ == "__main__":
|
||
send_test_event() |