The Python MQbtT library

This module supports the implementation of the optimizer side communicating with a miner (be it ESP-miner running on some ESP-based board, or cgminer running on a Linux machine controlling some USB-connected ASIC miner).

Usage of the library

The usage is very simple: first you need to create a concrete subclass of MQbtTBase that will be used to communicate with the miner:

The pair id is a string that should be unique for the pair of optimizer and miner, and should be the same for both parties.

class MQbtTBase(broker_uri: str, pair_id: str, receive_subtopic: str)

Base class for all MQbtT classes.

This class is not meant to be used directly, but only as a base class for other classes. It provides the basic functionality to connect to the MQTT broker and to send and receive messages.

Parameters:
  • broker_uri – the URI of the MQTT broker (example: mqtt://localhost)

  • pair_id – the pair id of the client

  • receive_subtopic – the subtopic to receive from (i.e.: esp, srv or log)

Note

This class uses internally a Queue, starts the MQTT Client and registers a callback that pushes all the messages for the receive_topic to such queue.

Subclasses use the queue to provide a synchronous way to receive messages via receive_T() methods (where T is the type of message the subclass handles). All methods have a blocking parameter that allows to block until the message is sent, or received; by default it is set to True. Alternatively, there is a has_T() method that checks if there is a message of type T in the queue.

It is of paramount importance for users of subclasses to receive messages in order for the queue non to grow indefinitely. Once the communication is over, the stop() method must be called to stop the MQTT client and to disconnect from the broker.

stop()

Stop the MQTT client and disconnect from the broker.

Messages format

The parties exchange messages of various types:

  • Request is sent by the optimizer to the miner,

  • Reply and Share are sent by the miner to the optimizer.

Messages use dataclasses, in order to reduce the risk of programming errors, they are immutable and their constructor accepts only keyword arguments; to create a message, you need to create an instance of the desired class, for instance:

request = Request(
  timestamp_min = 3, 
  timestamp_max = 10, 
  nonce_start = 5, 
  nonce_size = 7, 
  reset = False
)

and to access the fields, you can use the dot notation:

request.timestamp_min
request.timestamp_max
request.nonce_start
request.nonce_size
request.reset

Messages are defined as follows. Observe that the from_bytes() and to_bytes() methods are usually not used directly, but are needed by the send() and receive() methods of the subclasses of MQbtTBase to serialize and deserialize the messages for transmission.

class Request(*, timestamp_min: int = 0, timestamp_max: int = 0, nonce_start: int = 0, nonce_size: int = 0, reset: bool = False)

Request message sent by the optimizer to the miner.

Parameters:
  • timestamp_min – Minimum timestamp.

  • timestamp_max – Maximum timestamp.

  • nonce_start – Minimum nonce.

  • nonce_size – Maximum nonce.

  • reset – Whether to reset the state.

static from_bytes(b: bytes) Request

Deserialize bytes into a Request object.

Parameters:

b – The bytes to deserialize.

Returns:

The deserialized Request object.

to_bytes() bytes

Serialize the Request to bytes using protobuf.

Returns:

The serialized protobuf representation of the request.

class Reply(kind: Kind = Kind.RESULT, num_shares: int = 0, new_block: bool = False)

Reply message from the miner to the optimizer.

Parameters:
  • kind – The kind of reply (e.g., RESULT).

  • num_shares – Number of shares in the reply.

  • new_block – Whether a new block was found.

static from_bytes(b: bytes) Reply

Deserialize bytes into a Reply object.

Parameters:

b – The bytes to deserialize.

Returns:

The deserialized Reply object.

to_bytes() bytes

Serialize the Reply to bytes using protobuf.

Returns:

The serialized protobuf representation of the reply.

The Kind enum is defined as

class Kind(*values)

Kind of the reply messages.

BYE = 3
HELLO = 0
RESET = 2
RESULT = 1

Finally, shares are represented by

class Share(version: int, prev_block: bytes, merkle_root: bytes, time: int, bits: int, nonce: int, pool_diff: int, job_id: str, extranonce2: str, diff: float, zeroes: int)

Share message from the miner to the optimizer.

Parameters:
  • version – The version rolled by the miner.

  • prev_block – The hash of the previous block.

  • merkle_root – The bytes of the merkle root.

  • time – The timestamp of the share.

  • bits – The difficulty bits.

  • nonce – The nonce rolled by the miner.

  • pool_diff – The pool difficulty.

  • job_id – The job ID.

  • extranonce2 – The extranonce2.

  • diff – The difficulty of the share.

  • zeroes – The (approx) number of leading zeroes in the share, computed as floor(log2(diff))).

static from_bytes(b: bytes) Share

Deserialize bytes into a Share object.

Parameters:

b – The bytes to deserialize.

Returns:

The deserialized Share object.

to_bytes() bytes

Serialize the Share to bytes using protobuf.

Returns:

The serialized protobuf representation of the reply.

The MQbtTOptimizer

The central class of the library is the one that allows to implement the optimizer.

class MQbtTOptimizer(broker_uri: str, pid: str)

Class for the optimizer to send and receive messages to the miner.

has_reply() bool

Check if there is a reply from the miner.

Returns:

True if there is a reply, False otherwise.

has_share() bool

Check if there is a share from the miner.

Returns:

True if there is a share, False otherwise.

receive_reply(blocking: bool = True) Reply | None

Receive a reply from the miner.

Parameters:

blocking – Whether to block until a reply is received.

receive_share(blocking=True) Share | None

Receive a share from the miner.

Parameters:

blocking – Whether to block until a share is received.

send_request(request: Request, blocking: bool = True) None

Send a request to the miner.

Parameters:

blocking – Whether to block until the request is sent.

A sketch of its usage is given by the following fragment of code

from mqbtt import MQbtTOptimizer, Request, Kind

# create the connection
handle = MQbtTOptimizer('cudone.law.di.unimi.it', 'test')

while True:

  # prepare the request
  request = Request(
    timestamp_min= ...,
    timestamp_max= ..., 
    nonce_start = ..., 
    nonce_size = ..., 
    reset=...
  )
  
  # send the request 
  handle.send_request(request)

  ... 

  # receive the reply
  reply = handle.receive_reply()
  
  # process the reply
  if (reply.kind == Kind.RESULT):
    ... reply.num_shares ...
    ... reply.new_block ...

# end the communication
handle.stop()

The MQbtTLogReceiver

Receiving logs is very easy.

class MQbtTLogReceiver(broker_uri: str, pid: str)

Class to receive log messages from the MQTT broker.

receive_log(blocking: bool = True) str | None

Receive a log message from the MQTT broker.

Parameters:

blocking – Whether to block until a log message is received.

A sketch of its usage is given by the following fragment of code

from mqbt import MQbtTLogReceiver

handle = MQbtTLogReceiver('cudone.law.di.unimi.it', 'test')

while True:
  message = handle.receive_log()
  ...

handle.stop()

The MQbtTMiner

For debugging purposes, the library provides a class to implement a (mock) miner.

class MQbtTMiner(broker_uri: str, pid: str)

Class for a (mock) miner to send and receive messages to the optimizer.

receive_request(blocking: bool = True) Request | None

Receive a request from the optimizer.

Parameters:

blocking – Whether to block until a request is received.

send_reply(reply: Reply, blocking: bool = True) None

Send a reply to the optimizer.

Parameters:

blocking – Whether to block until the reply is sent.

A sketch of its usage is pretty symmetrical to the one of the optimizer

from mqbt import MQbtTMiner, Reply

handle = MQbtTMiner(broker_address, pair_id)

while True:
  
  # receive the request from the optimizer
  request = handle.receive_request()

  # process the request and prepare the reply

  ... request.timestamp_min ...
  ... request.timestamp_max ...
  ... request.nonce_start ...
  ... request.nonce_size ...
  ... request.reset ...

  reply = Reply(
    num_shares= ...,
    new_block= ...,
    kind= Kind...
  )

  # send the reply
  handle.send_reply(reply)

handle.stop()

The MQbtTDatabase class

As seen in the next section, the library has a tool that can be used to record all the traffic in the mqbtt subtopics in a SQLite with the following schema

TABLE messages (
  id INTEGER PRIMARY KEY AUTOINCREMENT,
  timestamp REAL NOT NULL,
  pair_id TEXT NOT NULL,
  subtopic TEXT NOT NULL CHECK (subtopic IN ('esp', 'log', 'shr', 'srv')),
  message BLOB NOT NULL
);

where timestamp is the time of the message in seconds since the epoch for the UTC timezone, pair_id is the pair id of the optimizer and miner, subtopic is one of the four subtopics of the mqbtt topic, and message is the serialized message (in bytes).

Once the traffic is recorded, one can conveniently use the replies class to read and deserialize the data from the database to further process it.

class MQbtTDataBase(path: str)

A SQLite database for MQbtT messages.

close()

Close the database connection.

insert(message: MQTTMessage)

Insert a mqtt message into the database (including the binary payload, and UTC timestamp).

records(pair_id: str | None = None, subtopic: str | None = None, min_timestamp: datetime | None = None, max_timestamp: datetime | None = None, desc: bool = False) Generator[MQbtTRecord, None, None]

Query the database for messages, auto-deserializing the message payload.

Parameters:
  • pair_id – The pair_id to filter by.

  • subtopic – The subtopic to filter by.

  • min_timestamp – The minimum timestamp to filter by.

  • max_timestamp – The maximum timestamp to filter by.

  • desc – Whether to order the results in descending timestamp order.

The class is conveniente because can be used to obtain a list of deserialized messages represented by the following class

class MQbtTRecord(*, id: int, pair_id: str, subtopic: str, timestamp: datetime, message: Reply | Share | Request | str)

A (deserialized) record from a MQbtTDataBase.

Parameters:
  • id – The ID of the message in the database.

  • pair_id – The pair_id of the message.

  • subtopic – The subtopic of the message.

  • timestamp – The timestamp of the message recording.

  • message – The deserialized message payload.

The mqbtt_dump command

As a convenient debugging tool, the mqbtt_dump command is provided, its usage is

Usage: mqbtt_dump [OPTIONS]

Options:
  --version                     Show version and exit.
  --broker TEXT                 MQTT URI
  --pid TEXT                    Pair id
  --record PATH                 Path to the record file.
  --mode [log|miner|optimizer]  Operation mode
  --help                        Show this message and exit.

The tool can be used for two purposes: if the --pid parameter is provided, it will inspect traffic, while if the --record parameter is provided, it will record the traffic in a SQLite database.

To inspect the traffic, once the broker URI and pair id are specified, the mqbtt_dump command, acting as the operation mode requires, will report:

  • log: the log messages received from the miner,

  • optimizer: the replies the optimizer receives from the miner,

  • miner: the requests the miner receives from the optimizer.

On the other hand, to record the traffic, once broker URI and the path to the database where to record the messages are provided, the mqbtt_dump command will record the traffic in the SQLite database until stopped.

The recorded traffic can then be conveniently read using the replies class as shown in the examples/Analysis.ipynb Jupyter notebook.

The example code

The logger example

An example of logger is provided in the examples/logger.py script. It connects to the broker on cudone.law.di.unimi.it using the pair id of test and echos every log message it receives.

If the testing Bitaxe is running, running

python examples/logger.py

should produce an output similar to

I (78745803) bm1368Module: Job ID: 00, Core: 1/1, Ver: 05942000
I (78745803) asic_result: Ver: 25942000 Nonce DE740102 diff 19283.4 of 4096.
I (78745813) stratum_api: tx: {"id": 2291, "method": "mining.submit", "params": ["bc1qnp980s5fpp8l94p5cvttmtdqy8rvrq74qly2yrfmzkdsntqzlc5qkc4rkq.bitaxe", "29163cc", "13000000", "67dd7a88", "de740102", "05942000"]}
I (78745973) stratum_task: rx: {"id":2291,"error":null,"result":true}
I (78745983) stratum_task: message result accepted
I (78757453) bm1368Module: Job ID: 28, Core: 40/9, Ver: 08352000
I (78757463) asic_result: Ver: 28352000 Nonce 7E540150 diff 542.9 of 4096.
I (78759513) bm1368Module: Job ID: 20, Core: 75/12, Ver: 00A18000
I (78759523) asic_result: Ver: 20A18000 Nonce D1BE0096 diff 322.8 of 4096.
I (78761213) bm1368Module: Job ID: 68, Core: 46/6, Ver: 0416C000
I (78761223) asic_result: Ver: 2416C000 Nonce 9CD0015C diff 995.4 of 4096.

The optimizer example

An example of optimizer is provided in the examples/optimizer.py script. It connects to the broker on cudone.law.di.unimi.it using the pair id of test; it then sends ten requests to the miner

Request(
  timestamp_min=i, timestamp_max=i + 1, 
  nonce_start = i//2, nonce_size = i // 2 + 1, 
  reset=(i % 2 == 0)
)

for i from 0 to 9, and prints for the replies.

The echo fake miner

To test the communication of the optimizer, a fake miner is provided that, for every request returns the following reply

Reply(
  num_shares=(
    request.timestamp_min * 1000000
    + request.timestamp_max * 10000
    + request.nonce_start * 100
    + request.nonce_size
  ),
  new_block=request.timestamp_min % 2 == 0,
  kind=Kind(request.timestamp_min % 4),
)

Testing the examples/optimizer.py script with the echo miner requires running

python -m mqbtt.echo

and then

python examples/optimizer.py

the output of the optimizer should be

Sent request: Request(timestamp_min=0, timestamp_max=1, nonce_start=0, nonce_size=1, reset=True)
Received reply: Reply(kind=<Kind.HELLO: 0>, num_shares=10001, new_block=True)
Sent request: Request(timestamp_min=1, timestamp_max=2, nonce_start=0, nonce_size=1, reset=False)
Received reply: Reply(kind=<Kind.RESULT: 1>, num_shares=1020001, new_block=False)
Sent request: Request(timestamp_min=2, timestamp_max=3, nonce_start=1, nonce_size=2, reset=True)
Received reply: Reply(kind=<Kind.RESET: 2>, num_shares=2030102, new_block=True)
Sent request: Request(timestamp_min=3, timestamp_max=4, nonce_start=1, nonce_size=2, reset=False)
Received reply: Reply(kind=<Kind.BYE: 3>, num_shares=3040102, new_block=False)
Sent request: Request(timestamp_min=4, timestamp_max=5, nonce_start=2, nonce_size=3, reset=True)
Received reply: Reply(kind=<Kind.HELLO: 0>, num_shares=4050203, new_block=True)
Sent request: Request(timestamp_min=5, timestamp_max=6, nonce_start=2, nonce_size=3, reset=False)
Received reply: Reply(kind=<Kind.RESULT: 1>, num_shares=5060203, new_block=False)
Sent request: Request(timestamp_min=6, timestamp_max=7, nonce_start=3, nonce_size=4, reset=True)
Received reply: Reply(kind=<Kind.RESET: 2>, num_shares=6070304, new_block=True)
Sent request: Request(timestamp_min=7, timestamp_max=8, nonce_start=3, nonce_size=4, reset=False)
Received reply: Reply(kind=<Kind.BYE: 3>, num_shares=7080304, new_block=False)
Sent request: Request(timestamp_min=8, timestamp_max=9, nonce_start=4, nonce_size=5, reset=True)
Received reply: Reply(kind=<Kind.HELLO: 0>, num_shares=8090405, new_block=True)
Sent request: Request(timestamp_min=9, timestamp_max=10, nonce_start=4, nonce_size=5, reset=False)
Received reply: Reply(kind=<Kind.RESULT: 1>, num_shares=9100405, new_block=False)

while the output of the echo miner should be

Received request: Request(timestamp_min=0, timestamp_max=1, nonce_start=0, nonce_size=1, reset=True)
Sent reply: Reply(kind=<Kind.HELLO: 0>, num_shares=10001, new_block=True)
Received request: Request(timestamp_min=1, timestamp_max=2, nonce_start=0, nonce_size=1, reset=False)
Sent reply: Reply(kind=<Kind.RESULT: 1>, num_shares=1020001, new_block=False)
Received request: Request(timestamp_min=2, timestamp_max=3, nonce_start=1, nonce_size=2, reset=True)
Sent reply: Reply(kind=<Kind.RESET: 2>, num_shares=2030102, new_block=True)
Received request: Request(timestamp_min=3, timestamp_max=4, nonce_start=1, nonce_size=2, reset=False)
Sent reply: Reply(kind=<Kind.BYE: 3>, num_shares=3040102, new_block=False)
Received request: Request(timestamp_min=4, timestamp_max=5, nonce_start=2, nonce_size=3, reset=True)
Sent reply: Reply(kind=<Kind.HELLO: 0>, num_shares=4050203, new_block=True)
Received request: Request(timestamp_min=5, timestamp_max=6, nonce_start=2, nonce_size=3, reset=False)
Sent reply: Reply(kind=<Kind.RESULT: 1>, num_shares=5060203, new_block=False)
Received request: Request(timestamp_min=6, timestamp_max=7, nonce_start=3, nonce_size=4, reset=True)
Sent reply: Reply(kind=<Kind.RESET: 2>, num_shares=6070304, new_block=True)
Received request: Request(timestamp_min=7, timestamp_max=8, nonce_start=3, nonce_size=4, reset=False)
Sent reply: Reply(kind=<Kind.BYE: 3>, num_shares=7080304, new_block=False)
Received request: Request(timestamp_min=8, timestamp_max=9, nonce_start=4, nonce_size=5, reset=True)
Sent reply: Reply(kind=<Kind.HELLO: 0>, num_shares=8090405, new_block=True)
Received request: Request(timestamp_min=9, timestamp_max=10, nonce_start=4, nonce_size=5, reset=False)
Sent reply: Reply(kind=<Kind.RESULT: 1>, num_shares=9100405, new_block=False)