1.8 Calling a service in Python

1.8 Calling a service in Python#

Although calling the services from commandline might be useful for debuggin perposes (like with the topics), it is more useful to be able to call services from the ROS2 Python or C++ API.

Like with the service we first need to find out the type of the service message. Although the commandline command already needed it, you can still get the type:

mirte_msgs/srv/SetMotorSpeed
mirte$ ros2 service type /mirte/set_right_speed

And like the services, we can also have a look at how the interface itself is defined.

int32 speed
---
bool status
mirte$ ros2 interface show mirte_msgs/srv/SetMotorSpeed

As you can see, there is a difference with the interface we used for topics. The topics just used a message type containing data. The interface for services also contain the direction of this data. In this case the service gets a speed (as int32) as input and returns a status (as bool).

Now that we have all the ingredients to call this service from python. Since we do not want to get rid of the previous node that subscribed to the sensordata, we can make another node to call the setSpeed service. As a reminder, this involves:

[INFO] [1715679971.177088773] [client_example_node]: Initialized example client
[INFO] [1715679971.189915283] [client_example_node]: Response: True
  1. Creating a new node called service.py

  2. Put the code below in that file

  3. Adding this node to setup.py

  4. Build everything

  5. Start the node

 1import rclpy
 2from rclpy.node import Node
 3from mirte_msgs.srv import SetMotorSpeed
 4
 5class ClientExampleNode(Node):
 6   def __init__(self):
 7      super().__init__("client_example_node")
 8      self._client = self.create_client(
 9         SetMotorSpeed,
10         "/mirte/set_left_speed")
11
12      while not self._client.wait_for_service(timeout_sec=10.0):
13         self.get_logger().info("Service not available")
14
15      self.get_logger().info("Initialized example client")
16
17   def send_request(self, request):
18      future = self._client.call_async(request)
19      rclpy.spin_until_future_complete(self, future)
20      return future.result()
21
22def main():
23   rclpy.init()
24   client_node = ClientExampleNode()
25   request = SetMotorSpeed.Request()
26   request.speed = 50
27   response = client_node.send_request(request)
28   client_node.get_logger().info("Response: " + str(response.status))
29   rclpy.shutdown()