+
Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions src/aleph/chains/chain_data_service.py
Original file line number Diff line number Diff line change
Expand Up @@ -231,6 +231,7 @@ async def make_pending_tx_exchange(config: Config) -> aio_pika.abc.AbstractExcha
port=config.rabbitmq.port.value,
login=config.rabbitmq.username.value,
password=config.rabbitmq.password.value,
heartbeat=config.rabbitmq.heartbeat.value,
)
channel = await mq_conn.channel()
pending_tx_exchange = await channel.declare_exchange(
Expand Down
2 changes: 2 additions & 0 deletions src/aleph/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -204,6 +204,8 @@ def get_defaults():
"pending_message_exchange": "aleph-pending-messages",
# Name of the RabbitMQ exchange used for sync/message events (input of the TX processor).
"pending_tx_exchange": "aleph-pending-txs",
# Heartbeat interval in seconds to prevent connection timeouts during long operations.
"heartbeat": 600,
},
"redis": {
# Hostname of the Redis service.
Expand Down
1 change: 1 addition & 0 deletions src/aleph/jobs/job_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ async def _make_pending_queue(
port=config.rabbitmq.port.value,
login=config.rabbitmq.username.value,
password=config.rabbitmq.password.value,
heartbeat=config.rabbitmq.heartbeat.value,
)
channel = await mq_conn.channel()

Expand Down
8 changes: 7 additions & 1 deletion src/aleph/jobs/process_pending_messages.py
Original file line number Diff line number Diff line change
Expand Up @@ -63,9 +63,14 @@ async def new(
mq_password: str,
message_exchange_name: str,
pending_message_exchange_name: str,
mq_heartbeat: int,
):
mq_conn = await aio_pika.connect_robust(
host=mq_host, port=mq_port, login=mq_username, password=mq_password
host=mq_host,
port=mq_port,
login=mq_username,
password=mq_password,
heartbeat=mq_heartbeat,
)
channel = await mq_conn.channel()
mq_message_exchange = await channel.declare_exchange(
Expand Down Expand Up @@ -179,6 +184,7 @@ async def fetch_and_process_messages_task(config: Config):
mq_password=config.rabbitmq.password.value,
message_exchange_name=config.rabbitmq.message_exchange.value,
pending_message_exchange_name=config.rabbitmq.pending_message_exchange.value,
mq_heartbeat=config.rabbitmq.heartbeat.value,
)

async with pending_message_processor:
Expand Down
1 change: 1 addition & 0 deletions src/aleph/toolkit/rabbitmq.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,5 +7,6 @@ async def make_mq_conn(config) -> aio_pika.abc.AbstractConnection:
port=config.rabbitmq.port.value,
login=config.rabbitmq.username.value,
password=config.rabbitmq.password.value,
heartbeat=config.rabbitmq.heartbeat.value,
)
return mq_conn
Loading
点击 这是indexloc提供的php浏览器服务,不要输入任何密码和下载