Source code for forestmq.provider
from enum import Enum
import httpx
from forestmq.session import Session
from forestmq.exceptions import ProviderError, SessionError
from forestmq.logger import logger
[docs]
class Protocol(Enum):
"""
Supported communication protocols for ForestMQ.
:cvar TCP: Use the HTTP-based TCP interface.
:cvar AMQP: (Planned) Use the AMQP protocol.
"""
TCP = 1
AMQP = 2 # TODO
[docs]
class Provider:
"""
Provider for sending messages to ForestMQ.
This class abstracts synchronous and asynchronous message delivery
to a ForestMQ server via the TCP protocol.
:param protocol: Communication protocol (only TCP is currently supported).
:param domain: Full domain (e.g., http://localhost:8005) where ForestMQ is running.
:raises ProviderError: If a non-supported protocol is provided.
"""
protocol: Protocol
session: Session
domain: str
def __init__(self, *, protocol: Protocol, domain: str):
"""
Initialize the Provider with the given protocol and domain.
:param protocol: Communication protocol (must be Protocol.TCP).
:param domain: Base domain or IP of the ForestMQ server.
:raises ProviderError: If protocol is not TCP.
"""
self.protocol = protocol
self.domain = domain
if self.protocol == Protocol.TCP:
logger.debug("Using TCP protocol")
self.session = Session(
domain=domain,
path="/provider",
)
else:
raise ProviderError("ForestMQ Error: Must use TCP protocol")
[docs]
def send_msg_sync(self, message: dict) -> str:
"""
Send a message synchronously to the ForestMQ provider endpoint.
:param message: A dictionary representing the message payload.
:return: JSON response from the server as a string.
:raises SessionError: If the HTTP request fails.
"""
data = {
"destroy": False,
"message": message,
}
try:
resp = self.session.send_msg_sync(message=data)
self.session.close()
return resp.json()
except httpx.RequestError as e:
raise SessionError(f"FORESTMQ Error: Failed to send message in session") from e
[docs]
async def send_msg(self, message: dict) -> str:
"""
Send a message asynchronously to the ForestMQ provider endpoint.
:param message: A dictionary representing the message payload.
:return: JSON response from the server as a string.
:raises SessionError: If the async HTTP request fails.
"""
data = {
"destroy": False,
"message": message,
}
try:
resp = await self.session.send_msg(message=data)
return resp.json()
except httpx.RequestError as e:
raise SessionError(f"FORESTMQ Error: Failed to send message in session") from e