User Guide
Rather than just lay out the API class by class and method by method, this User Guide walks through some use cases for the PyHam AX.25 package, starting from the basics and adding capability as it progresses.
For complete details of the classes and methods, see the API Reference.
Monitoring AX.25 traffic
As our first example, we will build up a command line tool that allows
the monitoring of received AX.25 traffic. This will be similar to a
basic version of the Linux AX.25 listen utility. We’ll assume that
packets are coming to us via some means - perhaps a KISS connection to
Direwolf - and we’ll take it from there.
First, we need to unpack the packet into something we can work with.
def print_frame(data):
frame = ax25.Frame.unpack(data)
Now we have a structured representation of the packet, derived from the
byte sequence we received. Let’s print out the source and destination,
just as listen would do.
def print_frame(data):
frame = ax25.Frame.unpack(data)
line = 'fm {!s} to {!s}'.format(frame.src, frame.dst))
print(line)
There are a couple of things to note here.
The
srcanddstproperties of theframeobject each provide us with anax25.Addressinstance. With this, we could access individual fields of the address, such as base callsign and SSID.Calling
str()on anax25.Addressreturns the usual string representation of a callsign, complete with any SSID. Since this is all we need right now, we use the!sconversion in the format string so thatstr()is invoked for us.
With the above, we now have the beginnings of our listen utility.
We’d see a sequence of lines such at the following on our console.
fm WR6ABD-5 to KU6S-2
Now let’s add any ‘via’ information that might be part of our packet. This will tell us how the packet is traveling through nodes in the network.
def print_frame(data):
frame = ax25.Frame.unpack(data)
line = 'fm {!s} to {!s}'.format(frame.src, frame.dst))
via = frame.via
if via:
line += " via "
line += " ".join([str(v) for v in via])
print(line)
A couple more things to note here:
The
viaproperty of theframeobject provides us with a tuple ofax25.Addressinstances, one for each repeater in the chain. If there are no repeaters, theviaproperty will returnNone.We’re explicitly invoking
str()here since we’re not usingformat().
Our output might now look like the following.
fm WR6ABD-5 to KU6S-2 via KLPRC3* KBERR
Note the asterisk after one of the repeaters. This indicates that the
packet has been repeated by this repeater. If we’d needed to access this
information from the ax25.Address instance, we could have done so
using the has_been_repeated property. However, in our case, it is
sufficient to let str() render this for us as part of the callsign.
Next we’ll add the control information associated with the packet. This
will necessitate looking at the frame type, so that we can determine
which fields might be relevant. This is a slightly larger code fragment,
so let’s break it down and look at it in a few isolated pieces before we
add it to our print_frame() function.
control = frame.control
ft = control.frame_type
line += " ctl {}".format(ft.name)
Things to note here:
The
controlproperty gives us structured access to the control byte of the frame, in the form of anax25.Controlinstance.The first thing we want to know is the type of the frame, so that we can make decisions about its content. The
frame_typeproperty of the control instance provides us with us a member of theax25.FrameTypeenumeration with which we can make those decisions.The names of the members of this enumeration correspond to the standard names for the frame types, so we can use that name directly in our format string.
The output from the latest version of our print_frame() might now
look something like this.
fm WR6ABD-5 to KU6S-2 via KLPRC3* KBERR ctl UI
Since we now have the frame type, we can selectively show information that is available only for certain frame types.
if not ft.is_U():
line += "(nr={})".format(control.recv_seqno)
if ft.is_I():
line += "(ns={})".format(control.send_seqno)
if ft is ax25.FrameType.I or ft is ax25.FrameType.UI:
line += " pid={:02X} len={}".format(frame.pid, len(frame.data))
Several things to note here:
The
ax25.FrameTypeenumeration also has member functions that allow the determination of the general kind of frame type. This conveniently lets us decide what to do without needing to consider all of the possible frame types.is_I()- ReturnsTruefor an I or Information frameis_S()- ReturnsTruefor an S or Supervisory frameis_U()- ReturnsTruefor a U or Unnumbered frame
Send and receive sequence numbers are available from the control instance using the
send_seqnoandrecv_seqnoproperties respectively.The Protocol Identifier, or PID, is available using the
pidproperty.The data content of the frame, if any, is available using the
dataproperty. The data is returned as abytes()instance, orNoneif there is no data.
Here’s the latest version of what our output might look like:
fm WR6ABD-5 to KU6S-2 via KLPRC3* KBERR ctl UI pid=F0 len=15
This completes our summary line, so let’s take a look at the complete
print_frame() function that we’ve constructed.
def print_frame(data):
frame = ax25.Frame.unpack(data)
line = 'fm {!s} to {!s}'.format(frame.src, frame.dst))
via = frame.via
if via:
line += " via "
line += " ".join([str(v) for v in via])
control = frame.control
ft = control.frame_type
line += " ctl {}".format(ft.name)
if not ft.is_U():
line += "(nr={})".format(control.recv_seqno)
if ft.is_I():
line += "(ns={})".format(control.send_seqno)
if ft is ax25.FrameType.I or ft is ax25.FrameType.UI:
line += " pid={:02X} len={}".format(frame.pid, len(frame.data))
print(line)
In only a few lines of code, we’ve unpacked an AX.25 packet and
summarized its contents in a manner very similar to the Linux listen
utility.
With a little additional work, we could easily add a bit more of what
the listen utility provides.
Coloring for callsigns. Instead of simply calling
str()on each address we could pass it to another function to color it first.Printing packet data. For packets that contain text, identified by a PID value of
0xF0, we could print out the data on the next line. We’d need to ensure that the characters are all printable first, though.
Adding NET/ROM routing table updates
The pyham_ax25 package includes the capability for unpacking NET/ROM
routing updates, so with our print_frame() function in hand, we can
very simply add this capability to what we already have.
First, we’ll add a couple of lines to the end of our print_frame()
function to determine whether or not the packet is of a type that allows
data, and, if it does, call a new function to print that data.
if (ft is ax25.FrameType.I or ft is ax25.FrameType.UI):
print_frame_data(frame)
Now let’s put together the beginnings of our new function.
def print_frame_data(frame):
if frame.pid == 0xF0:
print(frame.data.decode('utf-8', 'replace'))
elif frame.pid == 0xCF:
rb = ax25.netrom.RoutingBroadcast.unpack(frame.data)
A few notes:
While we’re looking at the data, we’ve gone ahead and included printing out text data, as mentioned at the end of the previous section. When the PID is
0xF0, indicating text, we simply decode the bytes into a printable string, replacing any unprintable characters.A PID of
0xCFindicates NET/ROM routing table updates. In this case, we use theax25.netrommodule to unpack the data into a structured representation that we can use to print the table.
With the structured data in hand, we can now print it out.
print("NET/ROM Routing: {}".format(rb.sender))
if rb.destinations:
for d in rb.destinations:
print(" {!s:>9} {:<6} {!s:>9} {:>3}".format(
d.callsign, d.mnemonic, d.best_neighbor, d.best_quality))
Again, a few notes:
The
ax25.netrom.RoutingBroadcastinstance provides us with the sender and a tuple of destinations. Each is accessed via a property.Each destination is an instance of
ax25.netrom.Destination, and has a set of properties representing that destination.The destination callsign and best neighbor are instances of
ax25.Address, so we use the!sconversion in the format string to obtain the appropriate string representation.The sender and each destination mnemonic are simple strings, and so can be printed out directly.
That’s it. Here is our completed print_frame_data() function.
def print_frame_data(frame):
if frame.pid == 0xF0:
print(frame.data.decode('utf-8', 'replace'))
elif frame.pid == 0xCF:
rb = ax25.netrom.RoutingBroadcast.unpack(frame.data)
print("NET/ROM Routing: {}".format(rb.sender))
if rb.destinations:
for d in rb.destinations:
print(" {!s:>9} {:<6} {!s:>9} {:>3}".format(
d.callsign, d.mnemonic, d.best_neighbor, d.best_quality))
With this addition, given an incoming NET/ROM routing table update, the
output from our print_frame() function might look like the
following.
fm WA6TOW-1 to NODES ctl UI pid=CF len=133
NET/ROM Routing: PAC
KF6ANX-5 HILL KF6ANX-5 192
KF6ANX-4 JOHN KF6ANX-4 192
N6ACK-4 LPRC3 K6JAC-4 146
N6RZR-5 RDG K6JAC-4 146
WA7DG-4 ROSE K6JAC-4 146
KI6ZHD-7 SCLARA KI6UDZ-7 134
Composing an Unproto message
Now we’ll turn our attention to the other side of the equation - creating an AX.25 packet to send out. For a simple use case, we’ll assume that we need to send out an Unproto message, for example as a beacon or as part of a weekly packet net conversation. Similarly to our earlier examples, we’ll assume that we hand off the completed packets to some transport mechanism - perhaps a KISS connection to Direwolf.
First, let’s define a couple of “constants” so that the rest of our function is a little more clear.
UNPROTO_FRAME_TYPE = ax25.FrameType.UI
UNPROTO_PID = 0xF0
Unproto messages are sent as Unnumbered Information (UI) packets, so we
can define that here. And as we saw earlier, a PID value of 0xF0 is used
to specify text content.
Now here’s all we need in order to compose our packet.
def compose_unproto_frame(src_call, dst_call, msg):
control = ax25.Control(UNPROTO_FRAME_TYPE)
frame = ax25.Frame(
dst_call,
src_call,
control=control,
pid=UNPROTO_PID,
data=msg.encode('utf-8'))
return frame
A couple of things worth noting:
The callsigns,
src_callanddst_call, may be either strings or instances ofax25.Address. In essence, this is the inverse of our use ofstr()in the earlier examples, insofar as passing in a string will cause it to be transparently converted to an instance ofax25.Addressinternally.The data passed to
ax25.Frame()must be an instance ofbytesorbytearray, so we must encode our message before passing it in.
If we were to hardcode an example of how this could be used, along with
some send_frame() function to actually send it, we might have:
src_call = 'K6EAG-2'
dst_call = 'KU6S-5'
message = 'Hello net, from Fremont, CA!'
frame = compose_unproto_frame(src_call, dst_call, message)
send_frame(frame.pack())
Notice that we call pack() on the frame instance to obtain the
bytes() we need for actually sending it. We could also just cast the
frame to bytes(), since this would call pack() behind the
scenes.
If someone were to be monitoring, using our earlier example, when this was sent, they would see this on their console:
fm K6EAG-2 to KU6S-5 ctl UI pid=F0 len=28
Hello net, from Fremont, CA!
Retrieving Port information (Linux only)
If we were to write our own listen utility based on the Linux AX.25
stack, we might want to include information on which port each packet is
received on, just as Linux’ own listen utility does. Unfortunately,
doing so is not quite as simple as it should be. For this reason, the
ax25.ports module provides functionality to assist with this.
Before we start our loop for receiving packets, we need to load in the port information from the AX.25 subsystem, like this:
ports = ax25.ports.PortInfo()
ports.load()
Once this is done, looking up the port information for each received frame is accomplished with a single call. For example:
data, addr = sock.recvfrom(1024)
port = ports.find_by_ifname(addr[0]).portname
This port information can then be passed in to a modified version of our
print_frame() function, and included in the summary line, perhaps as
the first item in the line, as Linux’ own listen utility does.
Note that AX.25 port information is not available through mechanisms such as KISS or AGWPE, so this only applies when using the native AX.25 stack directly. (While KISS and AGWPE do have a concept of a port number, this is not the same as an AX.25 port, and is effectively a sequence number assigned to a connection by a server such as Direwolf.)
Using Connected Mode (Linux only)
Although the Python socket module defines the AF_AX25 value for
the AX.25 address family, it does not actually provide the means for using
it. In particular, there is no way to construct an AX.25 address, and
therefore no way to bind to one, or connect to one.
The ax25.socket module provides variants of the standard Python
socket methods that accept (only) AX.25 addresses. In keeping with the
other modules in this package, addresses may be provided as strings (i.e.
callsigns) or as ax25.Address instances. Once a socket is established
using these methods, it can then be manipulated using the usual Python
socket methods.
This means that we can write, for example, connected mode applications in
Python. While writing a complete socket-based application is beyond the
scope of this User Guide, a full example is provided with this package,
in the form of a GUI application along the lines of the Linux axcall
utility that is included with Linux’s AX.25 software. Here we will show
how the connection is established; the remaining functionality is the
same as for regular socket applications.
First, we must create a new AX.25 socket:
sock = ax25.socket.Socket()
This socket is a subclass of the regular Python socket class, with
address family AF_AX25 and, by default, a type of SOCK_SEQPACKET,
which is what we want for connected mode use.
Now we need to create the connection between ourselves (source callsign) and our target system (destination callsign). First we bind the socket.
def connect(sock, src_call, dst_call):
sock.bind(src_call)
The code for working with the socket is a little more straightforward if we use a non-blocking socket, so we need to set that up, since it would be blocking by default.
sock.setblocking(False)
Now we can go ahead and request that a connection be made.
res = sock.connect_ex(dst_call)
This call will return immediately. To wait for the connection to complete, or to timeout, we need to use a selector to wait until the socket is writable.
if res == errno.EINPROGRESS:
with selectors.DefaultSelector() as sel:
sel.register(self._sock, selectors.EVENT_WRITE)
events = sel.select(self._CON_TIMEOUT)
if not events:
log.error("Connection attempt has timed out")
return False
elif res != 0:
log.error("Connection attempt has failed")
return False
log.info("Connection created")
return True
Note that we have simply logged a timeout or an error here. In reality, you would want to notify the user appropriately.
This completes the code required to create a connected mode socket. Let’s take a look at the finished function:
def connect(sock, src_call, dst_call):
sock.bind(src_call)
sock.setblocking(False)
res = sock.connect_ex(dst_call)
if res == errno.EINPROGRESS:
with selectors.DefaultSelector() as sel:
sel.register(self._sock, selectors.EVENT_WRITE)
events = sel.select(self._CON_TIMEOUT)
if not events:
log.error("Connection attempt has timed out")
return False
elif res != 0:
log.error("Connection attempt has failed")
return False
log.info("Connection created")
return True
That’s it. Once the connection has been created, we can use regular Python socket calls to work with this connection, sending and receiving data as usual.
For a complete working example of how a connected mode socket is created
and used in a real-world working example, see the Connect example
provided with this package.