Source code for forestmq.session
from urllib.parse import urljoin
import json
import httpx
from forestmq.logger import logger
[docs]
class Session:
"""
Session class for handling synchronous and asynchronous HTTP communication
with a ForestMQ server.
This class wraps `httpx` to send messages to a configured ForestMQ endpoint.
Example usage:
from forestmq.session import Session
session = Session(domain="http://localhost:8005", path="/provider")
response = session.send_msg_sync(message={"key": "value"})
print(response.status_code)
"""
domain: str
path: str
url: str
headers: dict
client: httpx.Client
def __init__(self, *, domain: str, path: str):
"""
Initialize the Session instance.
:param domain: The base URL of the ForestMQ server (e.g., "http://localhost:8005").
:param path: The endpoint path to send messages to (e.g., "/provider").
"""
self.domain = domain
self.path = path
self.client = httpx.Client()
self.headers = {
"Content-Type": "application/json",
}
self.url = urljoin(self.domain, self.path)
[docs]
def send_msg_sync(self, *, message: dict) -> httpx.Response:
"""
Send a message synchronously to the ForestMQ server.
:param message: The message dictionary to send as JSON.
:return: The response from the ForestMQ server.
Example:
session = Session(domain="http://localhost:8005", path="/provider")
response = session.send_msg_sync(message={"name": "Sync message"})
print(response.json())
"""
response = self.client.post(
url=self.url,
json=message,
headers=self.headers,
)
response.raise_for_status()
return response
[docs]
async def send_msg(self, *, message: dict) -> httpx.Response:
"""
Send a message asynchronously to the ForestMQ server.
:param message: The message dictionary to send as JSON.
:return: The async response from the ForestMQ server.
Example:
import asyncio
async def main():
session = Session(domain="http://localhost:8005", path="/provider")
response = await session.send_msg(message={"name": "Async message!"})
print(response.json())
asyncio.run(main())
"""
async with httpx.AsyncClient() as client:
response = await client.post(
url=self.url,
json=message,
)
response.raise_for_status()
return response
[docs]
def close(self):
"""
Close the internal httpx.Client.
:return: None
"""
self.client.close()