Building A Custom Broadcaster
Overview
Getting Started
from bsv import (
Broadcaster,
BroadcastFailure,
BroadcastResponse,
Transaction,
HttpClient,
default_http_client,
)class WOC(Broadcaster):
def __init__(self, network: str = "main", http_client: HttpClient = None):
"""
Constructs an instance of the WOC broadcaster.
:param network: which network to use (test or main)
:param http_client: HTTP client to use. If None, will use default.
"""
self.network = network
self.URL = f"https://api.whatsonchain.com/v1/bsv/{network}/tx/raw"
self.http_client = http_client if http_client else default_http_client()
async def broadcast(
self, tx: Transaction
) -> Union[BroadcastResponse, BroadcastFailure]:
"""
Broadcasts a transaction via WOC.
:param tx: The transaction to be broadcasted as a serialized hex string.
:returns: BroadcastResponse or BroadcastFailure.
"""
request_options = {
"method": "POST",
"headers": {"Content-Type": "application/json", "Accept": "text/plain"},
"data": {"txhex": tx.hex()},
}
try:
response = await self.http_client.fetch(self.URL, request_options)
if response.ok:
txid = response.json()["data"]
return BroadcastResponse(
status="success", txid=txid, message="broadcast successful"
)
else:
return BroadcastFailure(
status="error",
code=str(response.status_code),
description=response.json()["data"],
)
except Exception as error:
return BroadcastFailure(
status="error",
code="500",
description=(str(error) if str(error) else "Internal Server Error"),
)Last updated
