ForestMQ API Reference

class forestmq.ForestMQ(*, domain, interval=1)[source]

Bases: object

Parameters:
  • domain (str)

  • interval (int)

consumer: Consumer
domain: str
interval: int
provider: Provider
class forestmq.consumer.Consumer(*, domain, interval)[source]

Bases: object

ForestMQ Consumer for polling messages from a queue endpoint.

Periodically polls the ForestMQ server at the given interval and invokes a user-defined callback with the message content.

Example

def callback(message: dict) -> None:

print(f”Consumer message: {message[‘message’]}”)

fmq = ForestMQ(domain=”http://localhost:8005”, interval=1) asyncio.run(fmq.consumer.poll(callback))

Parameters:
  • domain (str) – The domain or IP where the ForestMQ server is hosted.

  • interval (int) – Polling interval in seconds.

domain: str
interval: int
async poll(callback)[source]

Begin polling the ForestMQ server and invoke the callback with each message.

Example usage:

import asyncio from forestmq import ForestMQ

def callback(message: dict) -> None:

print(f”Consumer message: {message[‘message’]}”)

if __name__ == “__main__”:

fmq = ForestMQ(domain=”http://localhost:8005”, interval=1) asyncio.run(fmq.consumer.poll(callback))

Parameters:

callback (Callable[[dict], None]) – A function that takes a dict message and returns None.

Raises:

ConsumerError – If the HTTP request fails.

Return type:

None

static set_headers()[source]

Set HTTP headers for the polling request.

Returns:

Dictionary with Content-Type header.

Return type:

dict

class forestmq.provider.Protocol(value)[source]

Bases: Enum

Supported communication protocols for ForestMQ.

Variables:
  • TCP – Use the HTTP-based TCP interface.

  • AMQP – (Planned) Use the AMQP protocol.

AMQP = 2
TCP = 1
class forestmq.provider.Provider(*, protocol, domain)[source]

Bases: object

Provider for sending messages to ForestMQ.

This class abstracts synchronous and asynchronous message delivery to a ForestMQ server via the TCP protocol.

Parameters:
  • protocol (Protocol) – Communication protocol (only TCP is currently supported).

  • domain (str) – Full domain (e.g., http://localhost:8005) where ForestMQ is running.

Raises:

ProviderError – If a non-supported protocol is provided.

domain: str
protocol: Protocol
async send_msg(message)[source]

Send a message asynchronously to the ForestMQ provider endpoint.

Parameters:

message (dict) – A dictionary representing the message payload.

Returns:

JSON response from the server as a string.

Raises:

SessionError – If the async HTTP request fails.

Return type:

str

send_msg_sync(message)[source]

Send a message synchronously to the ForestMQ provider endpoint.

Parameters:

message (dict) – A dictionary representing the message payload.

Returns:

JSON response from the server as a string.

Raises:

SessionError – If the HTTP request fails.

Return type:

str

session: Session
class forestmq.session.Session(*, domain, path)[source]

Bases: object

Session class for handling synchronous and asynchronous HTTP communication with a ForestMQ server.

This class wraps httpx to send messages to a configured ForestMQ endpoint.

Example usage:

from forestmq.session import Session

session = Session(domain=”http://localhost:8005”, path=”/provider”) response = session.send_msg_sync(message={“key”: “value”}) print(response.status_code)

Parameters:
  • domain (str)

  • path (str)

client: Client
close()[source]

Close the internal httpx.Client.

Returns:

None

domain: str
headers: dict
path: str
async send_msg(*, message)[source]

Send a message asynchronously to the ForestMQ server.

Parameters:

message (dict) – The message dictionary to send as JSON.

Returns:

The async response from the ForestMQ server.

Return type:

Response

Example

import asyncio

async def main():

session = Session(domain=”http://localhost:8005”, path=”/provider”) response = await session.send_msg(message={“name”: “Async message!”}) print(response.json())

asyncio.run(main())

send_msg_sync(*, message)[source]

Send a message synchronously to the ForestMQ server.

Parameters:

message (dict) – The message dictionary to send as JSON.

Returns:

The response from the ForestMQ server.

Return type:

Response

Example

session = Session(domain=”http://localhost:8005”, path=”/provider”) response = session.send_msg_sync(message={“name”: “Sync message”}) print(response.json())

url: str
exception forestmq.exceptions.ConsumerError[source]

Raised when a consumer encounters an error during message polling or handling.

Raises:

ConsumerError – if a message cannot be consumed or an internal failure occurs within the consumer.

exception forestmq.exceptions.ForestMQError[source]

Base exception class for all ForestMQ-related errors.

All other custom exceptions inherit from this.

exception forestmq.exceptions.ProviderError[source]

Raised when the provider fails to send a message or is misconfigured.

Raises:

ProviderError – if message delivery or provider setup fails.

exception forestmq.exceptions.SessionError[source]

Raised when an HTTP session or request fails during communication with the ForestMQ server.

Raises:

SessionError – if there is a transport-level failure in sending or receiving data via HTTP/HTTPS.