ROS2 tutorial: let robots play Kerbal Space Program

Introduction & overview

The Robot Operating System is a great framework for building robots and other distributed systems. Amongst other benefits, it provides everything that’s required to allows users to share code effectively, and thus build on each other’s work. If you’re for example working with a 4-wheeled robot, you no longer need to solve all the common problems – like driving in a straight line, localisation, mapping and path planning from first principles. You can instead install packages that do these things for you, and configure a ROS-based system to adapt the common libraries to your robot.

Since December 2017, ROS has a new major version, ROS2. This changes a lot under the hood, but many of the well-designed concepts are still the same. This rather long blog post goes over how to build a ROS2-based “robot” that will launch and land a spacecraft in the Kerbal Space Program (KSP) game. It will cover:

  1. Docs about ROS1, ROS2 and installing ROS
  2. The plan
  3. Setting up KSP with the Telemachus (Reborn) and kOS mods and building a simple spacecraft
    • You could most likely achieve the same thing with kRPC. I chose Telemachus and kOS to show how to work with two different interfaces.
  4. ROS node (publisher) for telemetry data
  5. Service for triggering stages and turning on SAS
  6. A toy “brain” for the spacecraft, which will launch and land the spacecraft back near the spaceport, using everything we’ve built before then (video)
  7. Review & reflection
  8. Troubleshooting

Why might this be of interest? While I was familiar with ROS1 (having used it for Multi-robot learning by demonstration and ROBERt, the Recycling Of Bottles Encouragement Robot), I wanted to discover the changes in ROS2. I thought to write this as part of my journey of discovery, in case it’s of use to anyone who might be interested in an end-to-end example use of ROS2, coming from a ROS1 background. There’s no particular reason for or against choosing KSP, other than it having the required mod support, being something new to me (not having played it much) and being something different from the usual robot sims – I’ve built interfaces to other games in the past, like an LCD “instrument” for FlightGear). Covering all the ROS concepts is beyond the scope of this tutorial, but I’ll provide some links that explain the most important concepts. I’ll also assume you’re already familiar with Python and C++. I also won’t include every line of code here, but will regularly include pointers, examples and instructions instead.

With that out of the way, let’s get started!

Getting to know ROS2

Concepts & docs

If you’re new to ROS, at this stage I’d probably recommend you have a look at the ROS1 documentation to become familiar with the ROS concepts. While some things have changed in ROS2 (e.g. there’s no more ROS master), the ROS2 documentation isn’t quite as friendly to ROS-newbies as the absolutely excellent ROS1 docs (which set a really high bar). Here are some of the essential links about ROS1:

If you already know ROS1, then here are some of the most useful documents I found about ROS2:

Installation

  1. Install and test ROS2
    • As an Arch Linux user, I followed the instructions for building ROS2 from source on the Arch Linux wiki’s ROS page, while referencing the previous link
    • I installed ROS2 into /scratch/ros2_master_ws (I used the master branch, rather than a release). I’ll call this $ROS_MAIN_WS.
    • I ran into this bug, where pyside_global.h could not be found. I found some fixes/workarounds for that, which I posted at the end of the bug. I’ll try to raise a PR once I have a better understanding of the impact of those changes – they may not be required on Ubuntu (which is the officially supported distro of ROS)
  2. Next, create a place where you can place your own code. For me, this was /scratch/ros2_master_ws_mycode. You’ll want to create a src directory in there. I’ll call this location $ROS_CUSTOM_CODE_WS

The plan

To build a simple automaton that controls Kerbal Space program, we need at least one data source from KSP, and at least one way to interact with it. We’ll create a ROS node for each of these, mimicing the idea that they could be different systems (e.g. a custom sensor and a cable to a robot for controlling the wheels, respectively).

The most basic control we can expose is turning the Stability Assist (System) on or off to ensure we fly in a straight line, and to control the spacecraft stages (this allows us to light up the first booster and arm the parachute).

Lastly, we’ll create another ROS node for the “brain” that interacts with the input and output system components.

An architecture diagram showing: Kerbal Space Program in the top left hand corner, with two boxes inside labelled Telemachus and kOS (the KSP mods). Telemachus is connected to a telemetry publisher, which has a label saying it's publishing into /ksp_telemetry. kOS is connected to a node labelled Interaction service, which has a label showing that it provides the /sas and /stage services. The last node, labelled the "brain", has arrows pointing towards the services and the /ksp_telemetry topic, labelled "calls" and "subscribes to" respectively
The ROS architecture for autonomously flying a spacecraft. The rationale for using services for interacting with KSP, rather than topics (which would also be possible) is explained later in this blog post.

And here’s what it will look like (video at the end):

A screenshot of Kerbal Space Program with terminal windows and a graph of altitude against time overlaid

Kerbal Space Program setup

Prerequisites

  • Install KSP from whereever you bought it (I’m using v1.7.3 on Linux)
  • Using CKAN or otherwise, install the following mods:
    • TelemachusReborn
    • kOS
    • ModuleManager (a dependency, should be selected automatically if using CKAN)
  • Ensure the mods are compatible with your version of KSP
  • Basic KSP knowledge (at least the first two tutorials/training missions)

Building a remote-controllable spacecraft

This won’t go into orbit, but will help us test our code. Start a new game in sandbox mode, go to the Vehicle Assembly Building (VAB), and build a spacecraft using the following parts:

After assembling the parts, place the booster in stage 2, the decoupler in stage 1, and the parachute in stage 0. Configure the parachute similarly to the tutorial (min pressure = 0.75 and altitude = 1000).

A spacecraft in the Kerbal Space Centre Vehicle Assembly Bay, made of the previously mentioned parts.
The spacecraft

Now, take this spacecraft to the launch pad, and check that the Telemachus Reborn telemetry mod is working by navigating to http://localhost:8085. You should see a basic web page with a section called Graphs and Tables open. Hit the Open button, and you should see a basic altitude graph.

A web page with three graphs titled altitude, orbital velocity and true anomaly. There are some other parameters and numerical values on the right.
Basic telemachus telemetry viewer

I had a look at the network traffic in the browser’s developer tools to compare the HTTP API docs for Telemachus against what was actually happening, and it looks like requesting data is quite straightforward: just query http://localhost:8085?datalink?name1=parameter1&name2=parameter2[&...]. The parameter names come from the API docs that you can get to via http://localhost:8085/telemachus/information.html, and the names seem to map to the keys in the return payload JSON object (the values are the parameter values).

Then, test kOS by right-clicking the CX-4181 Scriptable Control System -> Open Terminal, and type PRINT "HELLO WORLD". (the dot at the end is important). If this doesn’t work, check the kOS quickstart guide. Now let’s enable the Telnet interface: on the far right hand side of the KSP UI, in the top-right corner there’s a column of buttons, the last one of which is kOS. Click that and enable the Telnet server. Check the kOS telnet docs on how to connect with a telnet client, and do a “hello world” test over telnet.

Lastly, check your spacecraft works by firing up the engine and landing everything apart from the booster safely, just like in the KSP tutorials.

The spacecraft payload is slowly descending into the sea near the Kerbal Space Center, thanks to a fully opened parachute.
First testflight

Publishing spacecraft telemetry into ROS topic

The minimum data we need to launch and land a basic spacecraft is probably the current altitude, so that we can trigger each stage of the spacecraft at specific altitudes – if we armed the parachute straight after firing up the solid fuel booster, it would deploy too quickly and result in chaos and fireworks (of course we could also do this all on a timer, but that doesn’t make for a good end-to-end tutorial; you could also use another telemetry channel like speed).

In ROS (and ROS2), you generally publish sensor data in messages to topics (ROS1 doc links) and let any interested party subscribe to the appropriate topics (rather than using a client/server model variant, for example). While we don’t have a real robot sensor here, which could for example send data via the USB interface, we can retrieve data exposed by the Telemachus mod. This allows fetching data via HTTP GET requests or websockets. For simplicity, we’ll stick to the former option.

For ROS1, there’s a really great multi-step tutorial that covers everything you need to know, including how to write Python and C++ programs that interface with ROS. The most relevant part to this section is Writing a Simple Publisher and Subscriber (Python). For ROS2, the closest equivalent tutorial on Using the rclpy API to write ROS 2 programs in Python is sadly a lot more bare-bones, so we’ll instead refer to the example code for working with topics.

Defining the message contents

While ROS1 and ROS2 contain a moderate number of pre-defined messages (run ros2 msg listto get an idea – you should prefer over creating your own to benefit from interoperability with other packages), I sadly could only find an altitude parameter in GPS messages, which contained other data we couldn’t easily source from KSP, like the number of satellites for which we have a fix. So, let’s create our own custom message format. From ROS1, I remember it was a good idea to define these in a separate package (generally suffixed with _msgs), as you might want other packages to depend on the message definitions, but not on all your other code for generating them.

Create a new directory under $ROS_CUSTOM_CODE_WS/src called ksp_msgs, and in there add the following files:

  • package.xml
  • CMakeLists.txt
  • msg/BasicTelemetry.msg

package.xml

This is the file that tells the ROS tooling what it’s called and everything it needs to know about building this package and any others that depend on it. In this case, we’ll adapt the following example package.xml file – just update the metadata (name = ksp_msgs, the rest is up to you), the dependencies and execution setup are fine.

CMakeLists.txt

Again, based on examples such as this one..

cmake_minimum_required(VERSION 3.5)

project(ksp_msgs)

# Default to C++14
if(NOT CMAKE_CXX_STANDARD)
  set(CMAKE_CXX_STANDARD 14)
endif()

if(CMAKE_COMPILER_IS_GNUCXX OR CMAKE_CXX_COMPILER_ID MATCHES "Clang")
  add_compile_options(-Wall -Wextra -Wpedantic)
endif()

find_package(ament_cmake REQUIRED)
find_package(builtin_interfaces REQUIRED)
find_package(rosidl_default_generators REQUIRED)

rosidl_generate_interfaces(ksp_msgs
  "msg/BasicTelemetry.msg"
  DEPENDENCIES builtin_interfaces
)

ament_package()

msg/BasicTelemetry.msg

These basically use an Interface Definition Language (IDL) to describe the message format. C++ and Python code is then generated from this description. See the docs under “Interfaces” on https://design.ros2.org and the ROS1 tutorial on Creating a ROS msg and srv for details.

# Universal Time (e.g. Year 1, day 1, ...)
builtin_interfaces/Time stamp

# Latitude [degrees]. Positive is north of equator; negative is south.
float64 latitude
# Longitude [degrees]. Positive is east of prime meridian; negative is west.
float64 longitude
# Altitude [m] above sea level
float64 altitude

# Heading [degrees]
float64 heading

#float64
# %age of atmospheric pressure [0..1]
float64 atmospheric_pressure_ratio

When you run colcon build the next time from $ROS_CUSTOM_CODE_WS, your message file should be built. You should now be able to see it when running ros2 msg show ksp_msgs/BasicTelemetry. You may have to run source $ROS_CUSTOM_CODE_WS/install/local_setup.<yourshell> again before showing the message definition works.

The publisher

We’ll need to create a ROS node (ROS1 doc link) that publishes the telemetry data into a topic. First, choose a ROS package name for the publisher, e.g. ksp_telemachus. Create a directory for it under $ROS_CUSTOM_CODE_WS, e.g. $ROS_CUSTOM_CODE_WS/src/ksp_telemetry. In there, create the following files and directories, largely based on the minimal publisher example:

  • setup.py – copied from example
  • setup.cfg – copied from example
  • package.xml – copied from example
  • README.md – fill with your own words of wisdom
  • ksp_telemetry – a directory to house Python code for publishing data
  • ksp_telemetry/__init__.py – Python package marker
  • ksp_telemetry/telemachus.py – main module; copy content from publisher_local_function.py.

package.xml

Set the <name> to ksp_telemetry, and add python-requests and the previously created message package ksp_msgs as <exec_depend>s (we’ll be using the requests library later) – on Arch Linux I can’t rely on rosbuild too much for dependency management as it’s only partially supported, but I think this is the right way to do it. Feel free to update the other metadata (version, description etc) to your liking.

We’ll also need the following <test_depend>encies later:

  • launch_testing_ros (this should have come with your ROS installation)
  • python3-pytest (that’s the package name for pytest on Ubuntu, on Arch Linux just drop the 3 since Python 3.x is the default these days)
  • python3-pytest-localserver (as above)

setup.py

This is a mostly a standard Python package setup.py file. Edit the setup.py file to remove references to files you’ve not kept, and add your own package metadata. Set package_name='ksp_telemetry', and entry_points to:

entry_points={
    'console_scripts': [
        'telemachus = ksp_telemetry.telemachus:main',
    ],
},

This allows you to execute the main() function inside ksp_telemetry/telemachus.py using: ros2 run ksp_telemetry telemachus

setup.cfg

Just fix the paths here, so that they refer to to $base/lib/ksp_telemetry

ksp_telemetry/telemachus.py

This is where we finally get to the meat of the problem. The basic idea is to use the Python requests library to hit the Telemachus HTTP endpoint mentioned in the KSP setup section, query some parameters, and stick those into a message. Here’s what we’ll do:

  1. We start off with imports. Note how message definitions are imported.
  2. We define the data we want to query from KSP via the Telemachus mod. To keep the conversion code small, I set the keys in the returned JSON object to match the BasicTelemetry message’s field names.
  3. There are some helper functions to populate the data in BasicTelemetry messages from a Python dictionary, and take care of any conversion as required.
  4. At the start of the main function, we create a ROS node (named ksp_publisher) and a publisher object – this allows us to publish messages to a given topic (here: ksp_telemetry). We also allocate a msg object, which we’ll re-use throughout the lifetime of this ROS node.
  5. Instead of the timer_callback in the example code sending out a numbered hello world string, we now fetch data from KSP, stuff it into a BasicTelemetry message, and publish it on the ksp_telemetry topic.
  6. The rest of the code creates a timer, runs the ROS event loop and ensures a clean shutdown of the ROS publisher and node.

Now, here’s the code:

import os

import requests
import rclpy

from builtin_interfaces.msg import Time
from ksp_msgs.msg import BasicTelemetry


DEFAULT_TELEMETRY_URL = "http://localhost:8085/telemachus/datalink"
DATA_TO_REQUEST = {
    #'p': 'p.paused',
    'stamp': 't.universalTime',
    # 'mt': 'v.missionTime',
    'heading': 'n.heading',
    # 'pitch': 'n.pitch',
    # 'roll': 'n.roll',
    'altitude': 'v.altitude',
    'longitude': 'v.long',
    'latitude': 'v.lat',
    'atmospheric_pressure_ratio': 'v.atmosphericPressure',
    # 'stage': 'f.stage'
}


def universal_time_to_datetime(float_secs):
    """
    Helper to convert a decimal time to Time(seconds, nanoseconds)
    Source: https://docs.ros.org/diamondback/api/rospy/html/rospy.rostime-pysrc.html#Time.from_sec
    """
    secs = int(float_secs)
    nsecs = int((float_secs - secs) * 1000000000)

    return Time(sec=secs, nanosec=nsecs)


def update_basic_telemetry(dest, data):
    # need to take care of timestamp datatype conversion
    data_conversion_functions = {
        'stamp': universal_time_to_datetime
    }
    for k, v in data.items():
        if k in data_conversion_functions:
            v_converted = data_conversion_functions[k](v)
            dest.__setattr__(k, v_converted)
        else:
            dest.__setattr__(k, v)


def main(args=None):
    rclpy.init(args=args)

    telemetry_url = os.environ.get("KSP_TELEMETRY_URL", DEFAULT_TELEMETRY_URL)
    node = rclpy.create_node('ksp_publisher')
    publisher = node.create_publisher(BasicTelemetry, 'ksp_telemetry', 10)

    msg = BasicTelemetry()

    def fetch_and_publish_data():
        data = requests.get(telemetry_url,
                            params=DATA_TO_REQUEST)
        try:
            data.raise_for_status()
            update_basic_telemetry(dest=msg, data=data.json())
            node.get_logger().info('Publishing: "%s"' % msg)
            publisher.publish(msg)
        except requests.HTTPError as e:
            node.get_logger().warning('Failed to retrieve data: %s' % e)

    timer_period = 0.5  # seconds
    timer = node.create_timer(timer_period, fetch_and_publish_data)

    rclpy.spin(node)

    # Destroy the timer attached to the node explicitly
    # (optional - otherwise it will be done automatically
    # when the garbage collector destroys the node object)
    node.destroy_timer(timer)
    node.destroy_node()
    rclpy.shutdown()


if __name__ == '__main__':
    main()

A more sophisticated version would receive a stream of telemetry data from KSP via e.g. websockets or a TCP/IP socket sending protobuf-encoded data (from the kRPC mod) and forward that to a ROS topic.

You can compile/install the code using e.g. colcon build --symlink-install. If you’ve created new ROS2 programs or packages, you should source $ROS_CUSTOM_CODE_WS/install/local_setup.<yourshell> again before attempting to ros2 run them.

Testing

It’s actually a lot easier to just test with the game (if you got everything right first time), but the proper order is to do unit & integration testing first.. But I won’t blame you for scrolling ahead for now and coming back to the proper testing later. ?

… with code

Create a directory called test. In there, you can create a file called test_example.py, with the following contents:

import unittest

class TestSomething(unittest.TestCase):
    def test_example(self):
        assert 2 == 1+1+1  # hmm?

You can test this by running colcon test --packages-select ksp_telemetry [--event-handlers console_cohesion+], and you should see the test failure. That last argument is there to dump the standard output of the test program to the console, so you don’t need to go look in the log file to see the error. If you want to, you can now fix the example test and unit-test the code above like you would do with any other Python code. We’ll focus on testing the ROS node in isolation instead.

For ROS1 you can find excellent and extensive documentation on quality and testing, and which is worth a read. Similarly to how you can create a launch configuration that starts all the nodes required to run your robot, you can create a test launch configuration to launch everything required to test some node in isolation, including appropriate prodding/test/stub nodes. Here’s a really simple example testing a hello world publisher with the following example test script and test launch file.

For ROS2, these details are… harder to come by. It appears that roslaunch and rostest have been replaced by.. a (ROS?) tool called launch, which somewhat confusingly also has extensions for ROS (launch_ros), which surely makes it ros_launch_ros? It’s a bit confusing. Anyway, eventually I found sufficient documentation and examples:

Building on the last example, I came up with the following code. It uses pytest-localserver (so you probably want to install that before running) to run a HTTP server for testing, serving some dummy telemetry data, while the previously written ROS node runs, fetches the data and hopefully sends it onto a topic.

First, you’ll see the setup code to define the launch configuration and deal with the http server lifecycle. The test code checks that a message with the correct contents is published onto a test topic (it makes use of topic remapping (ROS1 docs link) to change the topic name).

import unittest
import time
import json

import launch
import launch_ros

from ksp_msgs.msg import BasicTelemetry

import pytest
from pytest_localserver import http

TEST_TOPIC = 'test_ksp_telemetry'


@pytest.mark.rostest
def generate_test_description(ready_fn):
    """
    Generate a LaunchDescription that starts the KSP telemetry node and a HTTP
    server for testing.
    """
    # Sadly it doesn't look like we can easily use the pytest fixture directly
    # httpserver, because we need the lifecycle of the HTTP server to
    # roughly match the ROS node's one, as we configure the telemetry URL on
    # startup. That's OK - launch_ros gives us sufficent options to do this.
    # The main ugly bit is needing to start the HTTP server here, so that the
    # port is known before we start the ROS node.
    # See https://bitbucket.org/pytest-dev/pytest-localserver/src/25bcc0df2252d23b1553a1e69a82244c0e242106/pytest_localserver/plugin.py#lines-12

    httpserver = http.ContentServer()
    httpserver.start()
    # remap topic to test_*
    telemachus_node = launch_ros.actions.Node(
        package='ksp_telemetry',
        node_executable='telemachus',
        additional_env={
            'PYTHONUNBUFFERED': '1',
            'KSP_TELEMETRY_URL': httpserver.url
        },
        remappings=[('ksp_telemetry',  TEST_TOPIC)])

    return (
        launch.LaunchDescription([
            telemachus_node,
            launch.actions.RegisterEventHandler(
                launch.event_handlers.OnShutdown(
                    on_shutdown=lambda event, context: httpserver.stop()
                )
            ),
            # Start tests right away - no need to wait for anything
            launch.actions.OpaqueFunction(function=lambda context: ready_fn()),
        ]),
        {
            'telemachus_node': telemachus_node,
            'httpserver': httpserver
        }
    )

class TestKSPTelemachusTelemetry(unittest.TestCase):

    def test_telemachus_telemetry_is_transmitted(self,
                                                 launch_service,
                                                 telemachus_node,
                                                 httpserver):
        # serve sample data
        sample_data = {
            'stamp': 1.23,
            'heading': 0.0,
            'altitude': 73.0,
            'longitude': 43.123,
            'latitude': 12.071,
            'atmospheric_pressure_ratio': 0.97
        }
        httpserver.serve_content(
            json.dumps(sample_data),
            headers={'Content-Type': 'application/json'})
        # Get launch context ROS node
        launch_context = launch_service.context
        node = launch_context.locals.launch_ros_node

        received_msgs = []
        test_subscription = node.create_subscription(
            BasicTelemetry,
            TEST_TOPIC,
            lambda msg: received_msgs.append(msg),
            10)  # specify history depth instead of qos_profile
        try:
            # wait until we have a test message
            for retries in range(30):
                if received_msgs:
                    break
                else:
                    time.sleep(0.1)
            received_msg = received_msgs[0]
            self.assertEqual(received_msg.altitude, sample_data['altitude'])
        finally:
            node.destroy_subscription(test_subscription)

You can run this test using colcon test [...], just like the unit test . If it hangs, ensure you run it with the previously mentioned arguments to dump the standard output to the terminal, and hit CTRL+C (or hit CTRL+C and look in the log file).

… with the game

You’ll of course also want to see it in action! To do this, run:

  • The KSP game, and take the spacecraft to the launch pad
  • ros2 run ksp_telemetry telemachus
  • rqt to visualise the data
    • Go to Plugins->Visualisation->Plot, and type /ksp_telemetry/altitude into the Topic field (Topic/field might be a better label perhaps for this).
A desktop screenshot with three windows: the Kerbal Space Program game, where a spacecraft with a deployed parachute is just about to splash into the ocean; rqt showing a plot of altitude against time (looks mostly like a parabola) and a console window with debug messages showing the raw data being published.
We have an altitude vs time plot in rqt! This proves that we’ve published data from KSP into ROS.

Interacting with the game via a ROS service and kOS

Prototyping the interaction

In order for us to be able to launch and land a spacecraft back on Kerbin, the bare minimum we’ll need is access to triggering the different stages of the spacecraft. This will ignite the booster and arm the parachute. Assuming the parachute is configured correctly, and the spacecraft doesn’t steer too far off course – which we can avoid with the “Stability Assist (System)” (SAS), this should be sufficient for a bare minimum setup. This can be done with kOS. From a ROS perspective, this probably maps best into a service (see ROS1 docs on services).

Before we create a service, let’s figure out how we can programmatically interact with kOS’ telnet server to stage the spacecraft. Install the python3-pexpect library, and fire up a python/ipython console or a jupyter notebook, and put a spacecraft on the launch pad in KSP, and let’s see what we can do. Here’s my experiment:

import pexpect
import sys

cpu_to_choose = 1
kos_terminal = pexpect.spawn('telnet localhost 5410')

# Get over Select CPU prompt
kos_terminal.expect('[1]')
kos_terminal.expect('> ')
kos_terminal.send('1\r\n')

# Humble beginnings
cmd = 'PRINT "HELLO WORLD".'
kos_terminal.send(cmd + '\r\n')

kos_terminal.expect(cmd)

kos_terminal.expect('HELLO WORLD')
# There's no prompt, which is a bit annoying. Let's see what there is:
end_of_line = [(chr(x), hex(x)) for x in kos_terminal.buffer]

# Looking at these, e.g. '\x1b[12;1H' I observed that it starts with 0x1b (hex ASCII), which is `ESC`, some terminal escape sequence.
# See e.g. http://www.termsys.demon.co.uk/vtansi.htm
# What you see is some "Cursor Home" command, that takes a row and column argument separated by `;` and the escape sequence ends with the `H`.
cursor_home_escape_sequence = '\x1b\[[0-9]+;1H'

kos_terminal.expect(cursor_home_escape_sequence)

assert len(kos_terminal.buffer) == 0

# Let's fly
cmd = 'SAS ON.'
kos_terminal.send(cmd + '\r\n')
kos_terminal.expect(cursor_home_escape_sequence)
len(kos_terminal.buffer) == 0

# launch / decouple / arm parachute - repeat as necessary until you're back on Kerbin.
cmd = 'STAGE.'
kos_terminal.send(cmd + '\r\n')
kos_terminal.expect(cursor_home_escape_sequence)
len(kos_terminal.buffer) == 0

# cleanup
kos_terminal.close()

The ROS2 service

Next, we’ll need to define the service message data. This is done in a similar way to how the .msg files define the data sent to topics, except we need to define inputs and outputs. There’s an incomplete ROS2 tutorial on this, but the ROS1 tutorial on .srv files contains everything you need to know (just ignore the “building .srv files” and “Client library support” bits). However, in this case, there’s not actually much data we need to exchange! We’re literally emulating a keystroke (the space bar). The STAGE. function in kOS returns None, but our service could fail if e.g. the telnet connection wasn’t established yet, so let’s use the std_srvs.Trigger data type – it takes no inputs, but it returns a boolean flag and a message. For turning on the Stability Assist (System) (SAS) we can then use std_srvs.SetBool, which has the same return type, but takes a boolean as input, which we can use to turn SAS on or off.

Have a look at the minimal example service, and see if you can combine that with the kOS telnet interaction code above, and your knowledge of ROS so far to build two simple services, one for staging and one for turning SAS on or off.

Hint: you can do both in one class, and I didn’t have to worry about any multithreading or multiprocessing.

Run the service, e.g.: ros2 run ksp_interaction kos_service

Test it:

$ ros2 service list
/kos_service/describe_parameters
/kos_service/get_parameter_types
/kos_service/get_parameters
/kos_service/list_parameters
/kos_service/set_parameters
/kos_service/set_parameters_atomically
/sas
/stage
$ ros2 service call /sas std_srvs/srv/SetBool '{data: true}'
$ ros2 service call /stage std_srvs/srv/Trigger '{}'
requester: making request: std_srvs.srv.Trigger_Request()

response:
std_srvs.srv.Trigger_Response(success=True, message='')

# once you've reached some altitude, stage again to decouple the engine,
# and again to arm the parachute.

Putting it all together into a robot of sorts (the “brain”)

Almost there.

Now we just need some automaton to control our spacecraft. A robot. A “brain” of sorts.

We’ll build a really simple one here, using a simple Finite State Machine (FSM):

A Finite State Machine diagram, showing the following states, each connected with an arrow from the current to the next state in the list: ready, stage 2, stage 1, stage 0, ground. All but the last transition are labelled with "stage". The last transition is labelled with "no change in altitude for X samples".
A simple FSM for controlling a robotic spacecraft.

The spacecraft will start off in “ready” state. It will then turn on SAS and issue the stage command to light the booster. Once a certain height threshold is reached, it will stage again and drop the booster. When the spacecraft has reached its maximum height and is on its way down again. it will arm the parachute, which will then automatically deploy near the surface of planet Kerbin.

To accomplish the above, we’ll be subscribing to the telemetry data published earlier, and using the service to enable SAS and trigger the spacecraft stages.

While we could create this in Python again, let’s use C++ this time to see how nicely ROS2 allows us to mix languages.

As you can probably guess, I’ll refer to some examples again (though I’d prefer referring to docs).

In addition, we only need a short CMakeLists.txt file, and a package.xml adapted from one of the examples.

CMakeLists.txt

cmake_minimum_required(VERSION 3.5)
project(ksp_brain)

# Default to C++14
if(NOT CMAKE_CXX_STANDARD)
  set(CMAKE_CXX_STANDARD 14)
endif()

if(CMAKE_COMPILER_IS_GNUCXX OR CMAKE_CXX_COMPILER_ID MATCHES "Clang")
  add_compile_options(-Wall -Wextra -Wpedantic)
endif()

find_package(ament_cmake REQUIRED)
find_package(rclcpp REQUIRED)
find_package(std_srvs REQUIRED)
find_package(ksp_msgs REQUIRED)

add_executable(fsm_brain fsm_brain.cpp)
ament_target_dependencies(fsm_brain rclcpp std_srvs ksp_msgs)

install(TARGETS
  fsm_brain
  DESTINATION lib/${PROJECT_NAME})

ament_package()

package.xml

Just copy one of the C++ examples mentioned above, and adjust the metadata (name, description, version, maintainer) to your liking, and ensure the following are listed as both build_depend and exec_depend:

  • rclcpp
  • std_srvs
  • ksp_msgs

fsm_brain.cpp

The code below tries to highlight the main ROS2 bits of interest. It starts off with some imports etc, before defining the TelemetrySubscriber and InteractionService (I should probably come up with a better name for that one) classes. These encapsulate the external interfaces, so that we can keep the amount of ROS-specific code in the following BrainFSM class to a minimum – this should facilitate testing. We are pragmatig and allow for some, so that we can e.g. make use of the ROS logging.

The main FSM code is in the BrainFSM::tick function.

Lastly, there’s the main function, and a little (optional) countdown helper for some extra flair, and the main event loop where we spin the ROS node. I’ve highlighted the most important bits below.

#include <chrono>
#include <cinttypes>
#include <memory>
#include <cmath>
#include <vector>
#include "rclcpp/rclcpp.hpp"

#include "std_srvs/srv/trigger.hpp"
#include "std_srvs/srv/set_bool.hpp"
#include "ksp_msgs/msg/basic_telemetry.hpp"
using std::placeholders::_1;
using namespace std::chrono_literals;

using Trigger = std_srvs::srv::Trigger;
using SetBool = std_srvs::srv::SetBool;


class TelemetrySubscriber
{
public:
    const int num_altitude_messages_same_to_consider_constant = 10;
    TelemetrySubscriber(std::shared_ptr<rclcpp::Node> n)
    {
        current_altitude_ = 0;
        max_altitude_ = 0;
        is_ready_ = false;
        node_ = n;
        subscription_ = n->create_subscription<ksp_msgs::msg::BasicTelemetry>(
            "ksp_telemetry", 10, std::bind(&TelemetrySubscriber::topic_callback, this, _1));
    }

    double get_current_altitude() {
        return current_altitude_;
    }

    double get_max_altitude() {
        return max_altitude_;
    }

    bool is_current_altitude_constant() {
        return num_altitude_messages_without_change_ >= num_altitude_messages_same_to_consider_constant;
    }

    // NB: this will spin until done
    bool wait_until_ready()
    {
        RCLCPP_INFO(node_->get_logger(), "Waiting for first telemetry message...");
        rclcpp::WallRate loop_rate(2);
        while (rclcpp::ok() && !is_ready_) {
            rclcpp::spin_some(node_);
            loop_rate.sleep();
        }
        return is_ready_;
    }

private:
    bool is_ready_;
    std::shared_ptr<rclcpp::Node> node_;
    double current_altitude_;
    // derived metrics
    int num_altitude_messages_without_change_;
    double max_altitude_;
    void topic_callback(const ksp_msgs::msg::BasicTelemetry::SharedPtr msg)
    {
        is_ready_ = true;
        if (abs(msg->altitude - current_altitude_) < 10e-1) {
            num_altitude_messages_without_change_++;
        } else {
            num_altitude_messages_without_change_ = 0;
        }
        current_altitude_ = msg->altitude;
        if (current_altitude_ > max_altitude_) {
            max_altitude_ = current_altitude_;
        }
        RCLCPP_DEBUG(node_->get_logger(), "Current altitude: %lf", msg->altitude);
    }
    rclcpp::Subscription<ksp_msgs::msg::BasicTelemetry>::SharedPtr subscription_;
};

class InteractionService
{
public:
    InteractionService(std::shared_ptr<rclcpp::Node> &node)
    {
        node_ = node;
        stage_client_ = node->create_client<Trigger>("stage");
        sas_client_ = node->create_client<SetBool>("sas");
    }

    bool wait_until_ready()
    {
        RCLCPP_INFO(node_->get_logger(), "Waiting for services...");
        while (!sas_client_->wait_for_service(std::chrono::seconds(1))) {
            if (!rclcpp::ok()) {
                RCLCPP_ERROR(node_->get_logger(), "client interrupted while waiting for service SAS to appear.");
                return false;
            }
            RCLCPP_INFO(node_->get_logger(), "waiting for service SAS to appear...");
        }

        while (!stage_client_->wait_for_service(std::chrono::seconds(1))) {
            if (!rclcpp::ok()) {
                RCLCPP_ERROR(node_->get_logger(), "client interrupted while waiting for service stage to appear.");
                return false;
            }
            RCLCPP_INFO(node_->get_logger(), "waiting for service stage to appear...");
        }
        return true;
    }

    // NB: this will spin until done
    bool enable_sas()
    {
        RCLCPP_INFO(node_->get_logger(), "Enabling SAS...");
        auto sas_request = std::make_shared<SetBool::Request>();
        sas_request->data = true; // SAS ON.
        auto result_future = sas_client_->async_send_request(sas_request);
        if (rclcpp::spin_until_future_complete(node_, result_future, 10s) !=
              rclcpp::executor::FutureReturnCode::SUCCESS)
        {
            RCLCPP_ERROR(node_->get_logger(), "set SAS service call failed :(");
            return false;
        }
        auto result = result_future.get();
        RCLCPP_INFO(node_->get_logger(), "result of enable SAS: %s",
                    result->success ? "success" : "fail");
        return result->success;
    }

    // NB: this will spin until done
    bool stage()
    {
        RCLCPP_INFO(node_->get_logger(), "Staging...");
        auto stage_request = std::make_shared<Trigger::Request>();
        auto result_future = stage_client_->async_send_request(stage_request);
        if (rclcpp::spin_until_future_complete(node_, result_future, 3s) !=
            rclcpp::executor::FutureReturnCode::SUCCESS)
        {
            RCLCPP_ERROR(node_->get_logger(), "stage service call failed :(");
            return false;
        }
        auto result = result_future.get();
        RCLCPP_INFO(node_->get_logger(), "result of stage: %s",
                    result->success ? "success" : "fail");
        return result->success;
    }

private:
    rclcpp::Client<Trigger>::SharedPtr stage_client_;
    rclcpp::Client<SetBool>::SharedPtr sas_client_;
    std::shared_ptr<rclcpp::Node> node_;
};

enum FSMState {
    ready = 10,
    stage2 = 2,
    stage1 = 1,
    stage0 = 0,
    back_on_ground = -1};

class BrainFSM {
public:
    BrainFSM(std::shared_ptr<rclcpp::Node> node, int stage1_altitude) :
        node_(node),
        telemetry_subscriber_(TelemetrySubscriber(node)),
        interaction_service_(InteractionService(node))
    {
        state_ = ready;
        stage1_altitude_ = stage1_altitude;
    }

    bool wait_until_ready()
    {
        return (interaction_service_.wait_until_ready()
                && telemetry_subscriber_.wait_until_ready());
    }

    void tick()
    {
        FSMState previous_state = state_;
        switch(state_)
        {
        case ready:
            if (!interaction_service_.enable_sas())
                break;
            if (!interaction_service_.stage())
                break;
            state_ = stage2;
            break;
        case stage2:
            if (telemetry_subscriber_.get_current_altitude() > stage1_altitude_)
            {
                if (!interaction_service_.stage())
                    break;
                state_ = stage1;
            }
            break;
        case stage1:
            {
                bool on_way_down = telemetry_subscriber_.get_current_altitude() < telemetry_subscriber_.get_max_altitude();
                if (on_way_down)
                {
                    if(!interaction_service_.stage())
                        break;
                    state_ = stage0;
                }
            }
            break;
        case stage0:
            if (telemetry_subscriber_.is_current_altitude_constant())
            {
                state_ = back_on_ground;
            }
            break;
        case back_on_ground:
            // we're done
            rclcpp::shutdown();
            break;
        }
        if (previous_state != state_) {
            RCLCPP_INFO(node_->get_logger(), "State change: %d -> %d",
                        previous_state, state_);
        }
    }

private:
    std::shared_ptr<rclcpp::Node> node_;
    TelemetrySubscriber telemetry_subscriber_;
    InteractionService interaction_service_;
    int stage1_altitude_;
    FSMState state_;
};


void countdown(std::shared_ptr<rclcpp::Node> node)
{
    RCLCPP_INFO(node->get_logger(), "Countdown started");
    rclcpp::WallRate loop_rate(1);
    for (int i = 10; i >= 0 && rclcpp::ok(); i--) {
        rclcpp::spin_some(node);
        RCLCPP_INFO(node->get_logger(), "%d...", i);
        loop_rate.sleep();
    }
}


int main(int argc, char * argv[])
{
  rclcpp::init(argc, argv);
  int stage1_altitude = 10000;

  auto node = rclcpp::Node::make_shared("ksp_fsm_brain");

  RCLCPP_DEBUG(node->get_logger(), "Initialising FSM");
  auto brain = BrainFSM(node, stage1_altitude);
  if (!brain.wait_until_ready())
  {
      RCLCPP_ERROR(node->get_logger(), "Initialisation failed");
      rclcpp::shutdown();
      return -1;
  }

  /*
   * Run the FSM.
   * An event-driven FSM would be ideal, but to keep the interfaces and boiler-
   * plate code minimal, I thought to go with a single-threaded constant-rate
   * one.
   *
   * Care must be taken with multithreading, nodes & spinners:
   * - You shouldn't spin a node from multiple threads (see
   *   https://github.com/ros2/rclcpp/issues/206).
   * - I tried using a timer (node->create_wall_timer), but "recursive spinning"
   *   isn't currently supported (see TODO in
   *   https://github.com/ros2/rclcpp/blob/83beaf8a3f9c6ac417fcb170b24712f95c67c5c2/rclcpp/include/rclcpp/executors.hpp#L76)
   */
  RCLCPP_INFO(node->get_logger(), "Init complete, all systems online.");
  // optional countdown
//  countdown(node);
  rclcpp::WallRate loop_rate(2);
  while (rclcpp::ok()) {
      rclcpp::spin_some(node);
      brain.tick();
      loop_rate.sleep();
  }

  return 0;
}

If something doesn’t work, you can build the project with debug symbols using e.g.: colcon build --symlink-install --packages-select ksp_brain --cmake-args -DCMAKE_BUILD_TYPE=Debug. Then, you can run the code in gdb using ros2 run --prefix 'gdb ...' ..., as explained here.

Running everything together

Place spacecraft on launchpad in KSP. Then, start up all the services in different terminals:

ros2 run ksp_interaction kos_service
ros2 run ksp_telemetry telemachus
ros2 run ksp_brain fsm_brain

.. and you should see the following:

Enjoy the view

Review & reflection

Let’s recapt what we’ve been through. As a ROS2 learning exercise, we created a really simple automaton that launches and lands a spacecraft in KSP using ROS2. We used ROS2 nodes, topics, services, defined a custom message format, C++ and Python.

Thinking about the experience of getting started with ROS1 vs ROS2, ROS1 was definitely easier, and the introductory material was top notch. I think ROS2 will get there, but right now I think you need to know a little bit what you’re looking for if you want to use ROS2. The collection of CLI tools from ROS1 (rosbag, roslaunch, rostest, …) in ROS2 are now unified under the ros2 (and maybe colcon) command(s), which is awesome for discoverability (ros2 -h). There are still some rough edges with some of the libraries – e.g. the missing time conversion helper function, and some smaller bugs I ran into – but overall the system feels usable. I’m definitely excited and optimistic about the future of ROS2!

Is what we built here useful as-is? Well, if you want to fly spacecraft in kOS without hitting buttons, you probably could’ve done it all with a small kOS script – in fact, there’s a nice example in their tutorial that’s a lot more sophisticated. If you want to use ROS/ROS2, then you probably want to hook it up to the Gazebo simulator instead – but I expect most of what we did here to already be taken care off when interfacing with Gazebo. While you may not have the opportunity to work at NASA writing ROS code for Robonaut2 (a robot that works with ROS), perhaps you can expand on the code and drive a Moon Mun-rover instead in KSP? Also – was this a more fun way to learn ROS2 than some of the other options? Is it useful to have another end-to-end walkthrough of ROS? You’ll be the judge, and tell me in the comments! ?

Troubleshooting

Bugs I ran into

While running colcon test

ImportError: cannot import name 'EXIT_TESTSFAILED' from '_pytest.main' (/usr/lib/python3.7/site-packages/_pytest/main.py)

This is a known issue – https://github.com/colcon/colcon-core/issues/196, and I fixed it by updating to a later version (flagged AUR package out of date).

Leave a Reply

Your email address will not be published. Required fields are marked *

This site uses Akismet to reduce spam. Learn how your comment data is processed.