From 3a8b3bebc6ff75c1e9214c43ce82b95098742c84 Mon Sep 17 00:00:00 2001 From: "komment-ai[bot]" <122626893+komment-ai[bot]@users.noreply.github.com> Date: Wed, 3 Jul 2024 15:56:15 +0000 Subject: [PATCH 1/3] Added comments to 8 functions across 1 file --- .komment/00000.json | 217 +++++++++++++++++++++++++++++++++++++++++ .komment/komment.json | 15 +++ pyFRI/tools/filters.py | 84 ++++++++++++++++ 3 files changed, 316 insertions(+) create mode 100644 .komment/00000.json create mode 100644 .komment/komment.json diff --git a/.komment/00000.json b/.komment/00000.json new file mode 100644 index 0000000..a2931a6 --- /dev/null +++ b/.komment/00000.json @@ -0,0 +1,217 @@ +[ + { + "name": "filters.py", + "path": "pyFRI/tools/filters.py", + "content": { + "structured": { + "description": "Three classes: StateFilter, ExponentialStateFilter, and MovingAverageFilter, which are subclasses of ABC's abstract class StateFilter. These filters are used to smooth or average a sequence of values, such as time series data, to reduce noise and highlight trends. The classes use different methods to filter the data, including exponential smoothing with a parameter for controlling the smoothing level, and moving average filtering.", + "items": [ + { + "id": "4fc5d738-da39-15b1-b944-b54021738842", + "ancestors": [], + "description": "Initializes a window of fixed size and appends elements to it using the `append()` method. It then filters the elements in the window using an abstract method.", + "attributes": [ + { + "name": "_window_size", + "type_name": "int", + "description": "Used to control the size of the sliding window used for filtering." + }, + { + "name": "reset", + "type_name": "instance", + "description": "Used to reset the internal buffer of the object to its initial state by clearing it of all elements." + } + ], + "name": "StateFilter", + "location": { + "start": 6, + "insert": 7, + "offset": " ", + "indent": 4, + "comment": null + }, + "item_type": "class", + "length": 14, + "docLength": null + }, + { + "id": "fb222aeb-9c23-64b1-464c-1432b304bcbf", + "ancestors": [ + "4fc5d738-da39-15b1-b944-b54021738842" + ], + "description": "Sets the `window_size` attribute of an instance of `StateFilter`, which is used to control the size of the moving window for filtering states. The `reset()` method is also defined within the function, which resets the internal state of the filter.", + "params": [ + { + "name": "window_size", + "type_name": "int", + "description": "Used to set the size of the sliding window used for computing moving averages." + } + ], + "returns": null, + "name": "__init__", + "location": { + "start": 7, + "insert": 8, + "offset": " ", + "indent": 8, + "comment": null + }, + "item_type": "method", + "length": 3, + "docLength": null + }, + { + "id": "b1e51236-70cd-f1aa-b94c-5a9f94cd9062", + "ancestors": [ + "4fc5d738-da39-15b1-b944-b54021738842" + ], + "description": "In the `StateFilter` class inherited from `abc.ABC`, does not perform any operation as it is marked as `@abc.abstractmethod`.", + "params": [ + { + "name": "x", + "type_name": "abcobject", + "description": "Used to represent any object that can be passed through an abstraction method." + } + ], + "returns": null, + "name": "filter", + "location": { + "start": 17, + "insert": 19, + "offset": " ", + "indent": 8, + "comment": null + }, + "item_type": "method", + "length": 3, + "docLength": null + }, + { + "id": "980780df-53ca-01a5-ad45-f91cc65929a8", + "ancestors": [], + "description": "Filters time-series data by taking an exponentially smoothed average of the previous values, with a smoothing parameter to control the degree of smoothing.", + "attributes": [ + { + "name": "_smooth", + "type_name": "float", + "description": "Set to a value of 0.02 during initialization, which represents the smoothing parameter used to blend the input values with the previous values in the window." + } + ], + "name": "ExponentialStateFilter", + "location": { + "start": 22, + "insert": 23, + "offset": " ", + "indent": 4, + "comment": null + }, + "item_type": "class", + "length": 12, + "docLength": null + }, + { + "id": "9deaa9bd-88f5-95b3-9d4b-805df833dc95", + "ancestors": [ + "980780df-53ca-01a5-ad45-f91cc65929a8" + ], + "description": "Sets up an instance of the `ExponentialStateFilter` class by initializing its internal variable `smooth` to a value passed as an argument, and then calls the parent class's `__init__` method to initialize the filter with a default value of 1.", + "params": [ + { + "name": "smooth", + "type_name": "float", + "description": "Set to `0.02`. Its value influences the initial position of the random walk." + } + ], + "returns": null, + "name": "__init__", + "location": { + "start": 23, + "insert": 24, + "offset": " ", + "indent": 8, + "comment": null + }, + "item_type": "method", + "length": 3, + "docLength": null + }, + { + "id": "0f1875a4-2763-91b9-7f4d-2b1e605d0ca3", + "ancestors": [ + "980780df-53ca-01a5-ad45-f91cc65929a8" + ], + "description": "Takes an input `x` and applies an exponential smoothing filter to it, appending the smoothed value to a internal list. The filter uses a parameter `smooth` to control the degree of smoothing.", + "params": [ + { + "name": "x", + "type_name": "object", + "description": "Used to represent an input value that is to be filtered using a smoothing window." + } + ], + "returns": { + "type_name": "nparray", + "description": "A smoothed version of the input value `x`, created by multiplying it with a smoothing factor and adding the initial value of the window array." + }, + "name": "filter", + "location": { + "start": 27, + "insert": 28, + "offset": " ", + "indent": 8, + "comment": null + }, + "item_type": "method", + "length": 7, + "docLength": null + }, + { + "id": "d3a882da-1a3f-6c89-0345-b747725b7bb1", + "ancestors": [], + "description": "Computes and returns the moving average of a sequence of values using the provided window size.", + "attributes": [], + "name": "MovingAverageFilter", + "location": { + "start": 36, + "insert": 37, + "offset": " ", + "indent": 4, + "comment": null + }, + "item_type": "class", + "length": 4, + "docLength": null + }, + { + "id": "be20df6e-79de-4b95-ca4b-05889468ac88", + "ancestors": [ + "d3a882da-1a3f-6c89-0345-b747725b7bb1" + ], + "description": "Appends an input value to a internal buffer and calculates the mean of the buffer along the axis 0.", + "params": [ + { + "name": "x", + "type_name": "object", + "description": "Passed to the method for filtering." + } + ], + "returns": { + "type_name": "npmean", + "description": "A measure of central tendency of a set of numbers." + }, + "name": "filter", + "location": { + "start": 37, + "insert": 38, + "offset": " ", + "indent": 8, + "comment": null + }, + "item_type": "method", + "length": 3, + "docLength": null + } + ] + } + } + } +] \ No newline at end of file diff --git a/.komment/komment.json b/.komment/komment.json new file mode 100644 index 0000000..c140fdf --- /dev/null +++ b/.komment/komment.json @@ -0,0 +1,15 @@ +{ + "meta": { + "version": "1", + "updated_at": "2024-07-03T15:56:10.941Z", + "created_at": "2024-07-03T15:56:11.430Z", + "pipelines": [ + "5e40830f-82dd-45be-9f09-2d5ccc7383a8" + ] + }, + "lookup": [ + [ + "pyFRI/tools/filters.py" + ] + ] +} \ No newline at end of file diff --git a/pyFRI/tools/filters.py b/pyFRI/tools/filters.py index 7695ebf..f7a18f2 100644 --- a/pyFRI/tools/filters.py +++ b/pyFRI/tools/filters.py @@ -4,7 +4,30 @@ class StateFilter(abc.ABC): + """ + Initializes a window of fixed size and appends elements to it using the + `append()` method. It then filters the elements in the window using an abstract + method. + + Attributes: + _window_size (int): Used to control the size of the sliding window used + for filtering. + reset (instance): Used to reset the internal buffer of the object to its + initial state by clearing it of all elements. + + """ def __init__(self, window_size): + """ + Sets the `window_size` attribute of an instance of `StateFilter`, which + is used to control the size of the moving window for filtering states. The + `reset()` method is also defined within the function, which resets the + internal state of the filter. + + Args: + window_size (int): Used to set the size of the sliding window used for + computing moving averages. + + """ self._window_size = window_size self.reset() @@ -16,15 +39,60 @@ def append(self, x): @abc.abstractmethod def filter(self, x): + """ + In the `StateFilter` class inherited from `abc.ABC`, does not perform any + operation as it is marked as `@abc.abstractmethod`. + + Args: + x (abcobject): Used to represent any object that can be passed through + an abstraction method. + + """ pass class ExponentialStateFilter(StateFilter): + """ + Filters time-series data by taking an exponentially smoothed average of the + previous values, with a smoothing parameter to control the degree of smoothing. + + Attributes: + _smooth (float): Set to a value of 0.02 during initialization, which + represents the smoothing parameter used to blend the input values with + the previous values in the window. + + """ def __init__(self, smooth=0.02): + """ + Sets up an instance of the `ExponentialStateFilter` class by initializing + its internal variable `smooth` to a value passed as an argument, and then + calls the parent class's `__init__` method to initialize the filter with + a default value of 1. + + Args: + smooth (float): Set to `0.02`. Its value influences the initial position + of the random walk. + + """ super().__init__(1) self._smooth = smooth def filter(self, x): + """ + Takes an input `x` and applies an exponential smoothing filter to it, + appending the smoothed value to a internal list. The filter uses a parameter + `smooth` to control the degree of smoothing. + + Args: + x (object): Used to represent an input value that is to be filtered + using a smoothing window. + + Returns: + nparray: A smoothed version of the input value `x`, created by multiplying + it with a smoothing factor and adding the initial value of the window + array. + + """ if len(self._window) == 0: self.append(x) xp = np.array(self._window[0]) @@ -34,6 +102,22 @@ def filter(self, x): class MovingAverageFilter(StateFilter): + """ + Computes and returns the moving average of a sequence of values using the + provided window size. + + """ def filter(self, x): + """ + Appends an input value to a internal buffer and calculates the mean of the + buffer along the axis 0. + + Args: + x (object): Passed to the method for filtering. + + Returns: + npmean: A measure of central tendency of a set of numbers. + + """ self.append(x) return np.mean(self._window, axis=0) From 369542ca22e0f19868f9ad2ca1f77f32f90a6223 Mon Sep 17 00:00:00 2001 From: "komment-ai[bot]" <122626893+komment-ai[bot]@users.noreply.github.com> Date: Sat, 7 Sep 2024 14:32:02 +0000 Subject: [PATCH 2/3] Added comments to 79 items across 10 files --- examples/LBRJointSineOverlay.py | 118 +++++++++++++++++- examples/LBRTorqueSineOverlay.py | 121 ++++++++++++++++++- examples/LBRWrenchSineOverlay.py | 112 ++++++++++++++++- examples/admittance.py | 44 ++++++- examples/hand_guide.py | 93 +++++++++++++-- examples/ik.py | 34 +++++- examples/joint_teleop.py | 122 ++++++++++++++++++- examples/task_teleop.py | 134 ++++++++++++++++++++- pyFRI/tools/state_estimators.py | 198 ++++++++++++++++++++++++++++++- setup.py | 80 ++++++++++--- 10 files changed, 995 insertions(+), 61 deletions(-) diff --git a/examples/LBRJointSineOverlay.py b/examples/LBRJointSineOverlay.py index 0623ca3..504cea4 100644 --- a/examples/LBRJointSineOverlay.py +++ b/examples/LBRJointSineOverlay.py @@ -1,14 +1,61 @@ -import sys -import math import argparse +import math +import sys + +import matplotlib.pyplot as plt import numpy as np import pandas as pd -import matplotlib.pyplot as plt + import pyFRI as fri class LBRJointSineOverlayClient(fri.LBRClient): + """ + Simulates a sine wave overlay on specific joints of a robot, with adjustable + frequency, amplitude, and filtering. It monitors state changes to update its + parameters and applies the simulated movement when ready. It also waits for + commands and adjusts joint positions accordingly. + + Attributes: + joint_mask (numpyndarray|int): Used to select a subset of joints for which + sine overlay is applied, ignoring others. + freq_hz (float): Initialized by the user when creating an instance of the + class. It represents a frequency value in Hertz that determines the + oscillation period of a sine wave overlay for joint positions. + ampl_rad (float): Used to scale the sine wave offset values in radians. + It controls the amplitude of the sinusoidal motion added to the joint + positions. + filter_coeff (float): Used as a coefficient for a simple exponential + smoothing filter to smooth out the oscillation amplitude. + offset (float): Initialized to 0. It represents a filtered sine wave offset + value used to update joint positions during the command method. + phi (float): Used as a phase angle in a mathematical function, specifically + the sine function. It increments by `self.step_width` each time the + command method is called. + step_width (float): 0 by default. It calculates the angular step for sine + wave overlay based on frequency, sample time, and is updated in `onStateChange`. + + """ def __init__(self, joint_mask, freq_hz, ampl_rad, filter_coeff): + """ + Initializes an object with specified parameters, including joint mask, + frequency, amplitude, and filter coefficient, as well as additional + attributes for offset, phase, and step width. + + Args: + joint_mask (bool | np.ndarray): 2-dimensional. It represents joint + (common) mask, likely used to specify which elements are valid for + further calculations or filtering processes. + freq_hz (float): Expected to be set to a value representing frequency + in hertz. It initializes an instance variable `freq_hz` of the class. + ampl_rad (float): 0.0 by default. It appears to represent an amplitude + value in radians, suggesting it could be related to oscillatory + behavior or wave properties. + filter_coeff (float | int): Assigned from an unknown source. Its purpose + is unclear without more context, but its name suggests it could + be related to filter coefficients used in signal processing. + + """ super().__init__() self.joint_mask = joint_mask self.freq_hz = freq_hz @@ -22,6 +69,18 @@ def monitor(self): pass def onStateChange(self, old_state, new_state): + """ + Resets certain parameters when the robot transitions to the 'MONITORING_READY' + state, and prints a message indicating the change in state. + + Args: + old_state (str | Enum): Described by the documentation as "the previous + state". It represents the state of the object before it was changed. + new_state (Enum[fri.ESessionState]): Passed to the function when the + state of an object changes. The specific value depends on the + context in which the function is called. + + """ print(f"Changed state from {old_state} to {new_state}") if new_state == fri.ESessionState.MONITORING_READY: self.offset = 0.0 @@ -31,11 +90,23 @@ def onStateChange(self, old_state, new_state): ) def waitForCommand(self): + """ + Sets the joint position of the robot based on the current Ipo joint position, + converting it to a float32 format before applying it. It appears to be + waiting for a command from an external source or system. + + """ self.robotCommand().setJointPosition( self.robotState().getIpoJointPosition().astype(np.float32) ) def command(self): + """ + Updates the offset for joint movements, calculates the new position based + on the updated offset and applies it to the robot's joints. It uses a + low-pass filter to smoothly transition between new and previous offsets. + + """ new_offset = self.ampl_rad * math.sin(self.phi) self.offset = (self.offset * self.filter_coeff) + ( new_offset * (1.0 - self.filter_coeff) @@ -49,7 +120,35 @@ def command(self): def get_arguments(): + """ + Parses command-line arguments for a program that communicates with a KUKA + Sunrise Controller, including hostname, port number, joint mask, frequency, + amplitude, filter coefficient, and data saving flag. It uses type conversion + and error checking to validate input values. + + Returns: + argparseNamespace: An object containing all the command line arguments + parsed by ArgumentParser. This object has attributes for each argument + added to the parser. + + """ def cvt_joint_mask(value): + """ + Converts a given value into an integer that represents a joint mask and + checks if it falls within the specified valid range (0 to 6). If not, it + raises an error with a descriptive message indicating the invalid value + and the acceptable range. + + Args: + value (Union[int, str]): Expected to be either an integer within the + specified range or a string that can be converted to an integer + within this range. + + Returns: + int|None: A integer value representing joint mask index from 0 to 6 + if valid input is provided otherwise it raises an ArgumentTypeError exception. + + """ int_value = int(value) if 0 <= int_value < 7: return int_value @@ -110,7 +209,18 @@ def cvt_joint_mask(value): def main(): - print("Running FRI Version:", fri.FRI_VERSION) + """ + Initializes a KUKA Sunrise controller connection, runs an LBRJointSineOverlay + client application, collects data when specified, and plots it as a 4x1 subplot + after disconnection, displaying measured and IPO positions along with torque + values over time. + + Returns: + int|None: 0 on successful completion and non-zero if connection to KUKA + Sunrise controller fails. + + """ + print("Running FRI Version:", fri.FRI_CLIENT_VERSION) args = get_arguments() client = LBRJointSineOverlayClient( diff --git a/examples/LBRTorqueSineOverlay.py b/examples/LBRTorqueSineOverlay.py index ea37aa2..8986ecc 100644 --- a/examples/LBRTorqueSineOverlay.py +++ b/examples/LBRTorqueSineOverlay.py @@ -1,13 +1,57 @@ -import sys -import math import argparse -import pyFRI as fri +import math +import sys import numpy as np +import pyFRI as fri + class LBRTorqueSineOverlayClient(fri.LBRClient): + """ + Implements a client for Lab Robot (LBR) torque control with a sine wave overlay + on specified joints. It monitors state changes, waits for commands, and sends + joint position and torque commands to the robot based on user-defined frequency + and amplitude parameters. + + Attributes: + joint_mask (Sequence[int]): Initialized in the constructor method, but its + exact nature or how it is used is unclear from this code snippet alone. + freq_hz (float): Initialized with a frequency value that will be used to + generate a sine wave for the torque command. It determines the number + of cycles per second in the sinusoidal pattern. + torque_ampl (float): Initialized with the torque amplitude value provided + to its constructor. It stores the maximum torque magnitude applied to + joints during oscillation. + phi (float): Initialized to zero. It represents a phase angle used for + generating sine wave amplitudes in the `command` method. + step_width (float): 0 by default. It represents the time increment for + updating the torque offset based on the sine wave frequency and sample + time. + torques (npndarray[float32,ndim1]): 0-indexed with a length equal to + `fri.LBRState.NUMBER_OF_JOINTS`, representing a numerical array + containing torques applied to each joint. + + """ def __init__(self, joint_mask, freq_hz, torque_amplitude): + """ + Initializes instance variables for joint mask, frequency, torque amplitude, + phase angle, step width, and torques array with zeros, setting default + values where applicable. + + Args: + joint_mask (np.ndarray[bool]): Used to specify which joints are being + controlled by the object. It has boolean values indicating whether + each joint should be included (True) or not (False). + freq_hz (float): Intended to represent a frequency value measured in + Hertz. It is used as an input to initialize the object with its + specified value. + torque_amplitude (float): Assigned to the instance attribute + `self.torque_ampl`. It represents an amplitude value related to + torque, but its precise purpose is not clearly stated in this code + snippet. + + """ super().__init__() self.joint_mask = joint_mask self.freq_hz = freq_hz @@ -20,6 +64,20 @@ def monitor(self): pass def onStateChange(self, old_state, new_state): + """ + Updates internal state variables based on changes to the robot's session + state. It resets torque values and certain parameters when the session + enters a monitoring-ready state. + + Args: + old_state (fri.ESessionState): Passed to indicate the state that + occurred before the change. It represents the current state of the + system or robot session when it transitioned from one state to another. + new_state (fri.ESessionState): Compared to a value MONITORING_READY, + which suggests it represents an enumeration or state machine that + tracks the current session state of a robotic system. + + """ print(f"State changed from {old_state} to {new_state}") if new_state == fri.ESessionState.MONITORING_READY: @@ -30,12 +88,23 @@ def onStateChange(self, old_state, new_state): ) def waitForCommand(self): + """ + Synchronizes joint positions with the robot's current IPO position, then + sets the robot's torque to a specified value if it is in Torque mode. + + """ self.robotCommand().setJointPosition(self.robotState().getIpoJointPosition()) if self.robotState().getClientCommandMode() == fri.EClientCommandMode.TORQUE: self.robotCommand().setTorque(self.torques) def command(self): + """ + Updates the robot's joint position to match its IPO (in-positioning + operation) trajectory and applies a torque offset to specific joints based + on a sinusoidal pattern, with an adjustable amplitude and phase. + + """ self.robotCommand().setJointPosition(self.robotState().getIpoJointPosition()) if self.robotState().getClientCommandMode() == fri.EClientCommandMode.TORQUE: @@ -51,7 +120,36 @@ def command(self): def get_arguments(): + """ + Parses command line arguments for a KUKA Sunrise Controller program, including + hostname, port number, joint to move, and parameters for sine wave generation. + It validates input using type conversions and range checks, raising exceptions + on invalid values. + + Returns: + argparseNamespace: A container object holding and providing access to the + arguments passed in as positional and optional (named) arguments parsed + from the command line or other input source. + + """ def cvt_joint_mask(value): + """ + Converts a given value into an integer that represents a joint mask value + within a specified range (0 to 7). It raises an error if the input is + outside this range. The conversion is implicit for values already in the + valid range. + + Args: + value (Union[int, str]): Specified as an input to the function when + it is called. It may be either an integer or a string representation + of an integer that can be converted to an integer. + + Returns: + int|None: The index of a joint mask. If the input value is within the + range [0, 7), it returns the integer value. Otherwise, it raises an + ArgumentTypeError with a message describing the valid range. + + """ int_value = int(value) if 0 <= int_value < 7: return int_value @@ -98,10 +196,23 @@ def cvt_joint_mask(value): def main(): - print("Running FRI Version:", fri.FRI_VERSION) + """ + Runs a FRI client application, connecting to a KUKA Sunrise controller over a + specified port and hostname. It then enters a loop where it steps through the + application until the session becomes idle or interrupted by a keyboard interrupt. + + Returns: + int: 1 if connection to KUKA Sunrise controller fails, and 0 otherwise. + The specific return value indicates whether the function executed successfully + or encountered an error. + + """ + print("Running FRI Version:", fri.FRI_CLIENT_VERSION) args = get_arguments() - client = LBRTorqueSineOverlayClient(args.joint_mask, args.freq_hz, args.torque_amplitude) + client = LBRTorqueSineOverlayClient( + args.joint_mask, args.freq_hz, args.torque_amplitude + ) app = fri.ClientApplication(client) success = app.connect(args.port, args.hostname) diff --git a/examples/LBRWrenchSineOverlay.py b/examples/LBRWrenchSineOverlay.py index 67216a0..e986cf8 100644 --- a/examples/LBRWrenchSineOverlay.py +++ b/examples/LBRWrenchSineOverlay.py @@ -1,13 +1,68 @@ -import sys -import math import argparse -import pyFRI as fri +import math +import sys import numpy as np +import pyFRI as fri + class LBRWrenchSineOverlayClient(fri.LBRClient): + """ + Initializes and controls a robot arm to apply sinusoidal wrenches along two + axes, while allowing for monitoring and command mode switching. It updates + wrench values based on frequencies, amplitudes, and phase offsets, ensuring + continuous motion within a periodic range. + + Attributes: + frequencyX (float|None): Initialized in the constructor to hold a frequency + value for the sine overlay along the x-axis. It stores a floating-point + number representing the frequency in Hertz. + frequencyY (float): Assigned a value through the constructor `__init__`. + It represents the frequency of a sine wave along the Y axis. + amplitudeX (float): Used to represent the amplitude of the sine wave in + the x-direction. It determines the magnitude of the oscillation applied + by the wrench command. + amplitudeY (float): Set through the constructor, initialized with a value + passed to the class instantiation. It defines the amplitude of sine + wave for the y-axis component of the wrench signal. + stepWidthX (float): 0 by default. It represents the angle increment for + the sine wave generation on the X-axis, calculated as 2π times the + frequency of oscillation multiplied by the sample time of the robot state. + stepWidthY (float): 0.0 by default. It stores the step width for updating + the Y-component of a wrench, calculated as 2 * pi times frequency Y + times sample time. + phiX (float): 0 by default. It stores a phase offset for sine wave generation + used to calculate wrench components, and is updated based on the + frequency and sample time in the `onStateChange` method. + phiY (float): 0 by default. It accumulates phase changes when commanded + to do so in methods like `command`. Its initial value is reset to zero + when the client state changes to MONITORING_READY. + wrench (ndarray[npfloat32,6]): Initialized to a zero vector with six + elements in its constructor. It represents a wrench applied to the + robot, composed of torque and force components along each axis. + + """ def __init__(self, frequencyX, frequencyY, amplitudeX, amplitudeY): + """ + Initializes an instance with parameters that define a sinusoidal wrench, + including frequencies and amplitudes for both X and Y axes, initializing + various internal attributes. + + Args: + frequencyX (float): Used to set the frequency of oscillation in the + X-direction. It represents how often the signal repeats itself + along this axis. + frequencyY (float): Set as an attribute of the class instance under + the same name. It is assigned the value passed to it when an object + of the class is created. + amplitudeX (float): 0 by default when not specified. It represents the + magnitude or size of a movement in the X-direction, likely for a + sinusoidal motion or oscillation. + amplitudeY (float): Used to set the amplitude of the movement in the + Y direction. + + """ super().__init__() self.frequencyX = frequencyX self.frequencyY = frequencyY @@ -23,6 +78,19 @@ def monitor(self): pass def onStateChange(self, old_state, new_state): + """ + Resets certain variables when the robot's state changes to MONITORING_READY, + preparing it for new movements with specified frequencies and sample times. + + Args: + old_state (ESessionState | None): Used to represent the old state of + the system before it changes, which in this case is expected to + be updated when the new state transitions into MONITORING_READY. + new_state (EnumMember[fri.ESessionState]): Checked against the value + fri.ESessionState.MONITORING_READY indicating that the session has + reached a state of readiness for monitoring. + + """ if new_state == fri.ESessionState.MONITORING_READY: self.phiX = 0.0 self.phiY = 0.0 @@ -34,11 +102,25 @@ def onStateChange(self, old_state, new_state): ) def waitForCommand(self): + """ + Sets the joint positions of the robot to its IPO (In-Positioning Override) + joint positions and optionally applies a wrench command based on the + client's current command mode. It is likely called after receiving a new + command from the server. + + """ self.robotCommand().setJointPosition(self.robotState().getIpoJointPosition()) if self.robotState().getClientCommandMode() == fri.EClientCommandMode.WRENCH: self.robotCommand().setWrench(self.wrench) def command(self): + """ + Generates and sends wrench commands to a robot in a specific client mode. + It sets joint positions based on IPO data, calculates sinusoidal wrench + values for X and Y axes, and updates these values over time by incrementing + phase angles modulo 2π. + + """ self.robotCommand().setJointPosition(self.robotState().getIpoJointPosition()) if self.robotState().getClientCommandMode() == fri.EClientCommandMode.WRENCH: self.wrench[0] = self.amplitudeX * math.sin(self.phiX) @@ -57,6 +139,18 @@ def command(self): def get_arguments(): + """ + Parses command-line arguments using the `argparse` library, extracting and + returning their values as a structured object. It defines several optional + arguments with default values for various settings related to communication + and sine wave generation. + + Returns: + argparseNamespace: An object containing all the arguments passed as a + dictionary-like object, where each key is the argument name and the + corresponding value is the parsed argument from user input. + + """ parser = argparse.ArgumentParser(description="LRBJointSineOverlay example.") parser.add_argument( "--hostname", @@ -104,7 +198,17 @@ def get_arguments(): def main(): - print("Running FRI Version:", fri.FRI_VERSION) + """ + Initializes and runs an FRI client application, connecting to a KUKA Sunrise + controller, executing a wrench-sine-overlay motion plan, and handling any + errors or interruptions that may occur during execution. + + Returns: + int|None: 1 if connection to KUKA Sunrise controller fails and 0 otherwise, + or None if the function terminates due to an exception being caught. + + """ + print("Running FRI Version:", fri.FRI_CLIENT_VERSION) args = get_arguments() print(args) diff --git a/examples/admittance.py b/examples/admittance.py index 19c288f..5cbb4b6 100644 --- a/examples/admittance.py +++ b/examples/admittance.py @@ -1,11 +1,28 @@ +import numpy as np import optas from robot import load_robot -import numpy as np - class AdmittanceController: + """ + Initializes and manages an admittance controller for a robotic arm, which + adjusts the arm's movement based on external forces or velocities while + maintaining joint limits and velocity constraints. It takes as input the robot's + current state and desired velocity, returning updated joint positions. + + """ def __init__(self, lbr_med_num): + """ + Initializes an optimization problem to control the motion of a robot + end-effector with desired velocity and minimum joint velocities, while + respecting joint limits and actuated joint range. + + Args: + lbr_med_num (int | str): Used to load a specific robot model from an + external source, such as the KUKA LBR Med arm. It likely identifies + the robot's numerical ID or configuration. + + """ # Setup robot self.ee_link = "lbr_link_ee" self.robot = load_robot(lbr_med_num, [1]) @@ -51,6 +68,29 @@ def __init__(self, lbr_med_num): self.vlim = np.concatenate(([0.2, 0.2, 0.2], np.deg2rad([40] * 3))) def __call__(self, qc, wr, dt): + """ + Simulates the dynamic behavior of an admittance-controlled system, updating + the generalized force and position based on initial conditions, solver + settings, and a time step. It returns the updated position at the specified + time. + + Args: + qc (float | int): Referenced as "qc" in several places within the code, + specifically when resetting parameters for the solver. It represents + a quantity or value in the simulation. + wr (float): Used to calculate the value of `vg` by multiplying it with + the `gain` attribute of the instance, possibly scaling or filtering + the input value. + dt (float): Used as time step in solving an initial value problem + through numerical integration, often representing the size of each + time step or increment. + + Returns: + float: The updated quality control value (`qg`) after solving a + differential equation, representing the state at the next time step + with respect to current state and input parameters. + + """ # Admittance control: map force -> velocity vg = self.gain * wr diff --git a/examples/hand_guide.py b/examples/hand_guide.py index ecb8a6e..c8c8163 100644 --- a/examples/hand_guide.py +++ b/examples/hand_guide.py @@ -1,27 +1,61 @@ -import sys -import math import argparse +import sys + +import numpy as np +from admittance import AdmittanceController + import pyFRI as fri +from pyFRI.tools.filters import ExponentialStateFilter from pyFRI.tools.state_estimators import ( - JointStateEstimator, FRIExternalTorqueEstimator, + JointStateEstimator, WrenchEstimatorTaskOffset, ) -from pyFRI.tools.filters import ExponentialStateFilter - -from admittance import AdmittanceController - -import numpy as np - -if fri.FRI_VERSION_MAJOR == 1: +if fri.FRI_CLIENT_VERSION_MAJOR == 1: POSITION = fri.EClientCommandMode.POSITION -elif fri.FRI_VERSION_MAJOR == 2: +elif fri.FRI_CLIENT_VERSION_MAJOR == 2: POSITION = fri.EClientCommandMode.JOINT_POSITION class HandGuideClient(fri.LBRClient): + """ + Initializes a hand guide client for an FRI (Force-Controlled Robot Interface) + compliant robot arm. It estimates joint states and wrenches, applies filters + to stabilize control inputs, and issues position commands to the robot based + on estimated values and controller feedback. + + Attributes: + controller (AdmittanceController|None): Initialized with lbr_ver in the + __init__ method, representing a controller object for admittance control + of a robot manipulator. + joint_state_estimator (JointStateEstimator): Initialized with `self` as + its argument, suggesting that it is used to estimate the joint states + of the robot based on data from the client itself. + external_torque_estimator (FRIExternalTorqueEstimator|None): Created by + calling the constructor FRIExternalTorqueEstimator with a reference + to the current object as its argument. + wrench_estimator (WrenchEstimatorTaskOffset|None): An instance of + WrenchEstimatorTaskOffset class. It estimates the wrench applied to + the end-effector of the robot. + wrench_filter (ExponentialStateFilter): Not used directly within the methods + in this snippet, but its filter method is called to modify a wrench + measurement before passing it through the controller. + + """ def __init__(self, lbr_ver): + """ + Initializes various components for admittance control, including estimators + and filters, setting up relationships between them based on their dependencies. + It sets up a complete structure for controlling an LBR robot arm using + admittance control methods. + + Args: + lbr_ver (str | int): Used to specify the version or variant of the + KUKA LBR robot arm model being controlled by the AdmittanceController + class. + + """ super().__init__() self.controller = AdmittanceController(lbr_ver) self.joint_state_estimator = JointStateEstimator(self) @@ -45,6 +79,12 @@ def onStateChange(self, old_state, new_state): print(f"State changed from {old_state} to {new_state}") def waitForCommand(self): + """ + Waits for a specific client command mode to be enabled, updates the wrench + estimator, retrieves the current joint position and stores it as self.q, + then calls the command_position method to send a command based on self.q. + + """ if self.robotState().getClientCommandMode() != POSITION: print( f"[ERROR] hand guide example requires {POSITION.name} client command mode" @@ -56,6 +96,13 @@ def waitForCommand(self): self.command_position() def command(self): + """ + Updates the robot's joint position based on filtered wrench data from an + estimator, controller and filter. It only proceeds with this calculation + if the estimator is ready. Otherwise, it updates the estimator and resets + the joint position. + + """ if not self.wrench_estimator.ready(): self.wrench_estimator.update() self.robotCommand().setJointPosition(self.q.astype(np.float32)) @@ -76,6 +123,17 @@ def command(self): def get_arguments(): + """ + Initializes an argument parser and defines command-line arguments for a script, + including hostname, port, and KUKA LBR Med version number, returning parsed + arguments as an object. + + Returns: + argparseNamespace: An object containing parsed arguments from the command + line, specifically hostname, port, and lbr-ver (KUKA LBR Med version number) + along with their corresponding values. + + """ parser = argparse.ArgumentParser(description="LRBJointSineOverlay example.") parser.add_argument( "--hostname", @@ -103,7 +161,18 @@ def get_arguments(): def main(): - print("Running FRI Version:", fri.FRI_VERSION) + """ + Initializes and runs a client application for interacting with a KUKA Sunrise + controller, executing steps until the robot reaches an idle state or interrupted + by user input. It handles connections, disconnections, and exceptions. + + Returns: + int: 0 on successful execution and 1 if connection to KUKA Sunrise controller + fails, indicating an unsuccessful run. The returned value is used as a + return code for the program. + + """ + print("Running FRI Version:", fri.FRI_CLIENT_VERSION) args = get_arguments() client = HandGuideClient(args.lbr_ver) diff --git a/examples/ik.py b/examples/ik.py index 1173df3..4f698dd 100644 --- a/examples/ik.py +++ b/examples/ik.py @@ -3,7 +3,6 @@ class IK: - """ This class solves the following problem @@ -35,6 +34,17 @@ class IK: """ def __init__(self, lbr_med_num): + """ + Initializes an instance, preparing it for inverse kinematics computation + using OptAS. It sets up parameters and constraints, such as joint velocity + limits and end-effector position goal, to be solved by a QP solver. + + Args: + lbr_med_num (int): Used to identify and load a specific KUKA LBR Med + robot model from a database or library of robots. It specifies the + robot's medium variant number. + + """ # Setup robot ee_link = "lbr_link_ee" self.robot = load_robot(lbr_med_num, [0, 1]) @@ -77,6 +87,28 @@ def __init__(self, lbr_med_num): self.solution = None def __call__(self, qc, vg, dt): + """ + Calls the solver to obtain an inverse kinematics solution for a given robot + configuration, visual geometry and time step. If successful, it returns + the desired joint angles; otherwise, it prints a warning and returns the + original configuration. + + Args: + qc (optas.Tensor): Used to initialize the solver's initial state, when + no solution is provided. It appears to represent control pulses, + possibly for quantum systems. Its shape and values are concatenated + horizontally with itself. + vg (float | np.ndarray): Used to reset parameters for the solver, + likely representing a time step or grid spacing value within a + numerical method or algorithm. + dt (float): Used as an input to the solver, likely representing time + steps or other temporal units relevant to the solution being computed. + + Returns: + ndarray|str: A flattened numpy array or the input parameter `qc`, in + case the solver fails to find a solution, indicating a warning condition. + + """ # Reset initial seed with previous solution if self.solution is not None: self.solver.reset_initial_seed(self.solution) diff --git a/examples/joint_teleop.py b/examples/joint_teleop.py index 79609f4..98ffdf0 100644 --- a/examples/joint_teleop.py +++ b/examples/joint_teleop.py @@ -1,11 +1,11 @@ import sys -# FRI Client: https://github.com/cmower/FRI-Client-SDK_Python -import pyFRI as fri - # PyGame: https://www.pygame.org/news import pygame +# FRI Client: https://github.com/cmower/FRI-Client-SDK_Python +import pyFRI as fri + pygame.init() # NumPy: https://numpy.org/ @@ -13,7 +13,37 @@ class Keyboard: + """ + Initializes a Pygame window and controls a virtual joint with key presses. It + can be turned on or off by pressing numbered keys, and its direction is + controlled by left and right arrow keys, with an escape key for exit. + + Attributes: + max_joint_velocity (float): Used to represent the maximum allowed velocity + for a joint, set to 5 degrees radian when initialized using the + `np.deg2rad(5)` function. + joint_index (NoneType|int): Initialized to None in the __init__ method. + It stores the index of the currently active joint, which ranges from + 1 to 7 (inclusive) when one of the number keys 1-7 is pressed. + joint_velocity (float): 0 by default. It accumulates and stores the net + velocity change applied to a joint based on the direction keys pressed, + bounded within a maximum limit defined by `max_joint_velocity`. + key_to_dir_map (Dict[int,float]): Initialized with a dictionary that maps + keys from Pygame's constant for left arrow to a float value of 1.0, + and right arrow key to -1.0. + key_joint_map (List[pygameK_]|pygameK_ESCAPE): 7 elements long, which maps + keyboard keys to their corresponding joints. It stores a list of + integers representing the key codes of the numbered keys on the keyboard + (1-7). + + """ def __init__(self): + """ + Initializes several attributes for handling joint movements using keyboard + input, including display settings, joint velocity limits, and mapping key + presses to direction and joint values. + + """ pygame.display.set_mode((300, 300)) self.max_joint_velocity = np.deg2rad(5) @@ -37,6 +67,15 @@ def __init__(self): ] def __call__(self): + """ + Manages user input from Pygame events. It handles window close, keyboard + key presses and releases, and updates joint velocity accordingly. + + Returns: + tuple[int,float]: A pair containing an integer indicating the currently + selected joint index and a float representing the scaled joint velocity. + + """ for event in pygame.event.get(): if event.type == pygame.QUIT: # User closed pygame window -> shutdown @@ -74,7 +113,32 @@ def __call__(self): class TeleopClient(fri.LBRClient): + """ + Enables teleoperation of a robot using keyboard input. It monitors the robot's + state, waits for user commands, and sends control commands to the robot based + on user input, allowing for real-time joint position or torque control. + + Attributes: + keyboard (Callable[[],Tuple[int|None,float]]): Likely a method that returns + the index of the joint to be controlled or None if no control is + desired, along with the velocity goal for that joint. + torques (npndarray[float32]|None): Initialized as a NumPy array of zeros + with length fri.LBRState.NUMBER_OF_JOINTS when the instance is created. + It represents joint torques in Newton meters. + + """ def __init__(self, keyboard): + """ + Initializes an instance with a keyboard object and sets up attributes to + store the keyboard and torques for the LBR robot, initializing the torques + array with zeros. + + Args: + keyboard (object): Assigned to the instance variable `self.keyboard`. + Its purpose is not specified within the code, but it may represent + an external input device or interface for user interaction. + + """ super().__init__() self.keyboard = keyboard self.torques = np.zeros(fri.LBRState.NUMBER_OF_JOINTS) @@ -83,6 +147,21 @@ def monitor(self): pass def onStateChange(self, old_state, new_state): + """ + Prints state changes and handles transitions to specific states, notably + when the robot reaches the MONITORING_READY state. It resets certain + attributes and displays instructions for controlling the robot joints using + keyboard inputs. + + Args: + old_state (Union[fri.ESessionState, fri.ESessionState | str]): Apparently + an enumeration object or a string representing the previous state + of the session. + new_state (fri.ESessionState): Used to store the current state of the + session. In this specific case, it can be one of the enumeration + values defined under fri.ESessionState. + + """ print(f"State changed from {old_state} to {new_state}") if new_state == fri.ESessionState.MONITORING_READY: @@ -100,6 +179,12 @@ def onStateChange(self, old_state, new_state): ) def waitForCommand(self): + """ + Retrieves the current joint position from the robot and sets it as the new + target position for the joints, updating the torque command if the client + is in torque mode. + + """ self.q = self.robotState().getIpoJointPosition() self.robotCommand().setJointPosition(self.q.astype(np.float32)) @@ -107,6 +192,13 @@ def waitForCommand(self): self.robotCommand().setTorque(self.torques.astype(np.float32)) def command(self): + """ + Receives user input from the keyboard and updates the robot's joint positions + based on velocity goals, before sending updated position commands to the + robot's control interface. It also sets torque values when operating in + torque mode. + + """ joint_index, vgoal = self.keyboard() if isinstance(joint_index, int): @@ -119,6 +211,17 @@ def command(self): def get_arguments(): + """ + Parses command-line arguments using `argparse`. It defines two optional + arguments: `hostname` and `port`, which are used to specify communication + settings with a KUKA Sunrise Controller. The parsed arguments are then returned + as an object. + + Returns: + argparseNamespace: An object containing the parsed command line arguments. + This includes the values for the specified options such as hostname and port. + + """ parser = argparse.ArgumentParser(description="LRBJointSineOverlay example.") parser.add_argument( "--hostname", @@ -138,7 +241,18 @@ def get_arguments(): def main(): - print("Running FRI Version:", fri.FRI_VERSION) + """ + Initializes and connects to a KUKA Sunrise controller, establishing a teleoperation + session using keyboard input and prints messages indicating success or failure + at each stage. It then enters a loop where it repeatedly checks for the robot's + state until idle or interrupted. + + Returns: + int: 1 if connection to KUKA Sunrise controller fails and 0 otherwise. + This indicates whether the program executed successfully or not. + + """ + print("Running FRI Version:", fri.FRI_CLIENT_VERSION) args = get_arguments() keyboard = Keyboard() diff --git a/examples/task_teleop.py b/examples/task_teleop.py index 98331a3..d7d18d4 100644 --- a/examples/task_teleop.py +++ b/examples/task_teleop.py @@ -1,13 +1,13 @@ -import sys import argparse +import sys from collections import OrderedDict -# FRI Client: https://github.com/cmower/FRI-Client-SDK_Python -import pyFRI as fri - # PyGame: https://www.pygame.org/news import pygame +# FRI Client: https://github.com/cmower/FRI-Client-SDK_Python +import pyFRI as fri + pygame.init() # NumPy: https://numpy.org/ @@ -20,6 +20,11 @@ def print_instructions(): + """ + Prints a set of instructions to control a robot using direction keys and task + axes on the PyGame window. + + """ print("\n") print("-" * 65) print("-- Control robot joints using LEFT/RIGHT direction keys. --") @@ -29,7 +34,39 @@ def print_instructions(): class Keyboard: + """ + Handles keyboard events to control tasks with velocities. It maps keys to task + indices and velocities, enabling task activation/deactivation and velocity + adjustments using arrow keys or dedicated key shortcuts. It also supports quit + event handling. + + Attributes: + max_task_velocity (float): 0.04 by default. It represents the maximum + velocity for a task, which can be scaled down using the `self.task_velocity` + attribute when keys are held to control task movement speed. + task_index (None|int): Set to a specific key when it is pressed and its + corresponding task label is turned ON or OFF, otherwise, its value + remains unchanged until the corresponding key is pressed again. + task_velocity (float): 0.0 by default. It accumulates changes based on + keyboard input, indicating how fast a task should be executed with + positive values representing increasing speed and negative values + representing decreasing speed. + key_to_dir_map (Dict[pygameK_LEFT,float]|Dict[pygameK_RIGHT,float]): + Initialized with mappings from pygame keyboard keys to floating point + numbers representing direction values. The map assigns a value of 1.0 + to the LEFT key and -1.0 to the RIGHT key. + key_task_map (OrderedDict[pygameK_x|pygameK_y||pygameK_a,str]): Initialized + to store key-value pairs where keys are specific keyboard keys (e.g., + pygame.K_x) and values are corresponding task labels (e.g., "x"). + + """ def __init__(self): + """ + Initializes game window settings, defines task velocity and mapping, sets + initial key-task mappings, and establishes relationships between keyboard + keys and their corresponding tasks or directions. + + """ pygame.display.set_mode((300, 300)) self.max_task_velocity = 0.04 @@ -51,6 +88,18 @@ def __init__(self): self.key_task_map[pygame.K_a] = "rz" def __call__(self): + """ + Handles Pygame events to control tasks and task velocity by keyboard input, + allowing users to turn tasks on/off with specific keys and adjust + direction/velocity with others. It also exits the program upon QUIT or + ESCAPE key press. + + Returns: + Tuple[int,float]: A tuple containing two values: the current task index + and the scaled velocity of the task. The task index and velocity are + computed based on user input events. + + """ for event in pygame.event.get(): if event.type == pygame.QUIT: # User closed pygame window -> shutdown @@ -89,7 +138,36 @@ def __call__(self): class TeleopClient(fri.LBRClient): + """ + Handles robot teleoperation, monitoring its state and sending commands to the + robot based on user input from a keyboard or other interface. It implements a + kinematic inverse solution for joint position updates. + + Attributes: + ik (Callable[[npndarray,npndarray,float],npndarray]): Used to perform + inverse kinematics calculations, mapping desired joint velocities to + joint positions. Its implementation is not provided in this code snippet. + keyboard (Keyboard|friLBRClient): Used to get tasks from a keyboard input, + where it returns a task index and velocity goal for each key pressed. + It appears to be responsible for translating user input into robot commands. + torques (npndarray[float32]): Initialized with zeros, representing the + torques applied to each joint of a robot arm. It is used to set torque + commands when the client command mode is set to TORQUE. + + """ def __init__(self, ik, keyboard): + """ + Initializes an instance with input kinematics (ik) and keyboard controls. + It sets up the LBR state, storing zeros for torques, indicating no initial + torque. + + Args: + ik (fri.IK): Initialized as an attribute `self.ik`. The exact nature + of this IK object is not provided, but it is assumed to be related + to inverse kinematics calculations. + keyboard (object): Assigned to an instance variable named `self.keyboard`. + + """ super().__init__() self.ik = ik self.keyboard = keyboard @@ -99,6 +177,19 @@ def monitor(self): pass def onStateChange(self, old_state, new_state): + """ + Prints state change notifications and updates internal variables when the + client enters a monitoring ready state, including resetting torque values + and queue. + + Args: + old_state (ESessionState | str): Used to store the previous state of + the session before it changes. + new_state (fri.ESessionState | Enum): Used to determine whether an + instruction should be printed, specifically when it reaches the + 'MONITORING_READY' state. + + """ print(f"State changed from {old_state} to {new_state}") if new_state == fri.ESessionState.MONITORING_READY: @@ -108,6 +199,13 @@ def onStateChange(self, old_state, new_state): print_instructions() def waitForCommand(self): + """ + Synchronizes joint positions and torque values with the robot's state, + ensuring that any commands sent by the client are applied to the robot's + current configuration. It sets joint positions and torque values according + to the provided data types. + + """ self.q = self.robotState().getIpoJointPosition() self.robotCommand().setJointPosition(self.q.astype(np.float32)) @@ -115,6 +213,12 @@ def waitForCommand(self): self.robotCommand().setTorque(self.torques.astype(np.float32)) def command(self): + """ + Updates the robot's joint positions and torques based on user input from + the keyboard, utilizing inverse kinematics to compute optimal joint values + for a specific task index and corresponding velocity goal. + + """ task_index, vgoal = self.keyboard() if isinstance(task_index, int): @@ -130,6 +234,16 @@ def command(self): def get_arguments(): + """ + Parses command line arguments using the argparse module to provide a structured + way to retrieve input parameters for a KUKA Sunrise Controller communication + script, such as hostname, port, and LBR Med version number. + + Returns: + argparseNamespace: An object containing attributes for each argument passed + to it. + + """ parser = argparse.ArgumentParser(description="LRBJointSineOverlay example.") parser.add_argument( "--hostname", @@ -157,7 +271,17 @@ def get_arguments(): def main(): - print("Running FRI Version:", fri.FRI_VERSION) + """ + Establishes a connection to a KUKA Sunrise controller, initializes an application + using the client and keyboard, and enters a loop where it continuously steps + through the application until a disconnect or idle session state is encountered. + + Returns: + int|None: 0 on successful execution and 1 if connection to KUKA Sunrise + controller fails. + + """ + print("Running FRI Version:", fri.FRI_CLIENT_VERSION) args = get_arguments() ik = IK(args.lbr_ver) diff --git a/pyFRI/tools/state_estimators.py b/pyFRI/tools/state_estimators.py index 829f500..6d80092 100644 --- a/pyFRI/tools/state_estimators.py +++ b/pyFRI/tools/state_estimators.py @@ -10,7 +10,6 @@ class JointStateEstimator: - """ JointStateEstimator @@ -29,6 +28,16 @@ def __init__( self, client, ): + """ + Intercepts incoming commands sent by the client, updates the estimator's + internal window data structures as necessary before executing the original + command. + + Args: + client (object): Expected to have an attribute or method named `command`, + which is replaced with the decorated version in this code. + + """ # Set class attributes/variables self._client = client self._first_update = True @@ -40,6 +49,16 @@ def __init__( orig_command = self._client.command def command(*args, **kwargs): + """ + Wraps another function called `orig_command`. It updates a window + before calling `orig_command`, allowing modifications to be made to + the state of the application before executing the original command. + + Args: + *args (list): List of positional arguments + **kwargs (dict): Dictionary of keyword arguments + + """ self._update_window() orig_command(*args, **kwargs) @@ -55,6 +74,11 @@ def ddq(self, idx): return np.array(self._ddq[idx]) def _update_window(self): + """ + Accumulates data from robot state messages, estimating joint velocity and + acceleration by computing differences between consecutive measurements. + + """ # Retrieve state dt = self._client.robotState().getSampleTime() q = self._client.robotState().getMeasuredJointPosition().flatten().tolist() @@ -85,7 +109,6 @@ def get_acceleration(self): class TaskSpaceStateEstimator: - """ TaskSpaceStateEstimator @@ -100,6 +123,29 @@ class TaskSpaceStateEstimator: def __init__( self, client, joint_state_estimator, robot_model, ee_link, base_link=None ): + """ + Initializes an instance by setting client and joint state estimator + properties, and computing functions to transform link coordinates and + calculate the geometric Jacobian based on a robot model and end-effector + or specified base link. + + Args: + client (object): Assigned to an instance variable `_client`. No further + information about its purpose or functionality is provided within + this code snippet. + joint_state_estimator (object): Used to estimate joint states likely + from sensor data or other sources and can be expected to produce + values which are passed into various functions within the class. + robot_model (object): Used to access methods related to transforms and + Jacobians of robot links based on its properties. It provides + functions to compute global link transform and geometric Jacobian. + ee_link (str | object): Required as it represents the end effector or + the link of interest of the robot model. + base_link (None | str): Optional by default. It specifies the base + link to which the end-effector's transform and Jacobian are + referenced, if not specified globally through the robot model. + + """ self._client = client self._joint_state_estimator = joint_state_estimator @@ -122,16 +168,49 @@ def __init__( raise ValueError(f"{base_link=} was not recognized") def get_transform(self): + """ + Retrieves the current position from a joint state estimator and returns a + transformation matrix calculated from that position using the `_T(q)` function. + + Returns: + Transform: A representation of the current position of the robot, + computed from its joint state estimator. It calls another internal + method `_T(q)` that likely applies some transformation based on the + input pose `q`. + + """ q = self._joint_state_estimator.get_position() return self._T(q) def get_velocity(self): + """ + Computes the velocity in the task space by taking the product of the + Jacobian matrix and the joint velocities. This allows for the transformation + of joint space velocities to task space velocities. + + Returns: + numpyndarray: A vector representing the velocity of the system, + calculated as the product of the Jacobian matrix and the joint velocities. + The returned object has dimensions equal to those of self._J(q). + + """ q = self._joint_state_estimator.get_position() dq = self._joint_state_estimator.get_velocity() J = self._J(q) return J @ dq def get_acceleration(self): + """ + Calculates the acceleration of an end-effector in a robot's task space by + estimating joint velocities and accelerations from state estimators, + differentiating them with respect to time, and computing their difference + over a sample period. + + Returns: + ndarray: Estimated acceleration of the robot, computed as a change in + velocity over time. + + """ # Retreive joint states q = self._joint_state_estimator.q(-1) qp = self._joint_state_estimator.q(-2) @@ -148,12 +227,34 @@ def get_acceleration(self): class ExternalTorqueEstimator(abc.ABC): + """ + Provides an abstract interface for estimating external torques, which are + forces that act on a system from outside its boundaries. It defines a single + abstract method, `get_external_torque`, to be implemented by concrete subclasses. + + """ @abc.abstractmethod def get_external_torque(self): + """ + Returns the estimated external torque value, which is assumed to be + implemented by concrete subclasses that inherit from this abstract base class. + + """ pass class FRIExternalTorqueEstimator(ExternalTorqueEstimator): + """ + Estimates external torques on a robot based on data from an FRI (Fieldbus + Remote I/O) client, which interacts with a robotic system to retrieve state + information, including the estimated external torque experienced by the robot. + + Attributes: + _client (RobotStateClient|object): Assigned a value in the `__init__` + method. It represents a client object that interacts with a robot's + state, used to retrieve external torque information. + + """ def __init__(self, client): self._client = client @@ -167,7 +268,6 @@ def get_external_torque(self): class WrenchEstimator(abc.ABC): - """ WrenchEstimator @@ -194,6 +294,35 @@ def __init__( base_link=None, n_data=50, ): + """ + Initializes its internal state by setting various attributes, including + estimators for joint and external torques, a robot model, and Jacobian + functions, depending on input parameters such as base link and data length. + + Args: + client (object | ClientType): Required. Its specific role or purpose + within the class is not explicitly described, suggesting it may + be a dependency for other functionality. + joint_state_estimator (object): Not explicitly defined within this + code snippet. However, it is expected to be an estimator capable + of providing joint state estimates. + external_torque_estimator (object): Stored as an attribute named + `_external_torque_estimator`. Its exact usage depends on its + implementation, but it likely estimates external torques affecting + the robot. + robot_model (object): Used to calculate geometric Jacobians for a robot + model, likely an instance of a class that provides methods to + compute kinematic quantities. + tip_link (str | int): Expected to be a reference to the end-effector + or last link of the robot arm, which is used by some methods of + the class. + base_link (str | NoneType): Used to specify the base link of the + Jacobian calculation. If not provided, it defaults to None, resulting + in global Jacobian calculation. + n_data (int): 50 by default. It determines the number of data points + to be stored internally for processing. + + """ # Create class attributes self._joint_state_estimator = joint_state_estimator self._external_torque_estimator = external_torque_estimator @@ -218,6 +347,13 @@ def __init__( self._data = [] def _inverse_jacobian(self): + """ + Computes an approximate inverse of the Jacobian matrix associated with the + current joint configuration, represented by q. This is achieved through + the use of NumPy's pinv function with a specified condition number threshold + (rcond). + + """ q = self._joint_state_estimator.get_position() return np.linalg.pinv(self._jacobian(q), rcond=self._rcond) @@ -225,20 +361,39 @@ def ready(self): return len(self._data) >= self._n_data def update(self): + """ + Checks if the available data is less than the required data threshold + `_n_data`. If so, it calls the `_update_data` method to fetch or generate + additional data. This ensures that the estimator has sufficient data before + proceeding with calculations. + + """ if len(self._data) < self._n_data: self._update_data() @abc.abstractmethod def _update_data(self): + """ + Defines an abstract operation that must be implemented by concrete subclasses, + providing a way to update internal data without being called directly from + outside the class hierarchy. It is intended for internal use within the + class or its subclasses. + + """ pass @abc.abstractmethod def get_wrench(self): + """ + Returns an estimate of a wrench, but its specific implementation is not + provided and must be defined by a subclass, making it an abstract method + according to ABC protocol. + + """ pass class WrenchEstimatorJointOffset(WrenchEstimator): - """ WrenchEstimatorJointOffset @@ -250,10 +405,27 @@ class WrenchEstimatorJointOffset(WrenchEstimator): """ def _update_data(self): + """ + Updates the internal data list with the estimated external torque calculated + by the `tau_ext` variable, which is retrieved from an estimator object and + converted to a list before being appended. + + """ tau_ext = self._external_torque_estimator.get_external_torque() self._data.append(tau_ext.tolist()) def get_wrench(self): + """ + Calculates an estimated wrench acting on a joint, accounting for external + torques and a joint offset. It returns the product of the inverse Jacobian + matrix transpose and the net external torque. + + Returns: + numpyndarray: 2D matrix representing a wrench at the end-effector, + calculated by taking the dot product of Jacobian inverse transpose and + the external torque minus offset. + + """ offset = np.mean(self._data, axis=0) tau_ext = self._external_torque_estimator.get_external_torque() - offset Jinv = self._inverse_jacobian() @@ -261,7 +433,6 @@ def get_wrench(self): class WrenchEstimatorTaskOffset(WrenchEstimator): - """ WrenchEstimatorTaskOffset @@ -273,11 +444,28 @@ class WrenchEstimatorTaskOffset(WrenchEstimator): """ def _update_data(self): + """ + Updates internal data with estimated external wrenches, computing the + wrench as the product of the inverse jacobian and an estimate of the + external torque. The result is appended to a list as a flat vector. + + """ tau_ext = self._external_torque_estimator.get_external_torque() f_ext = self._inverse_jacobian().T @ tau_ext self._data.append(f_ext.flatten().tolist()) def get_wrench(self): + """ + Calculates an estimated wrench (force + torque) by applying an inverse + Jacobian transformation to an external torque estimate and subtracting an + offset calculated from mean data. + + Returns: + numpyndarray: A wrench (moment + force) applied by the robot's end + effector, calculated as the difference between the external torque and + the offset derived from the data. + + """ offset = np.mean(self._data, axis=0) tau_ext = self._external_torque_estimator.get_external_torque() return self._inverse_jacobian().T @ tau_ext - offset diff --git a/setup.py b/setup.py index 1963da8..426e73b 100644 --- a/setup.py +++ b/setup.py @@ -1,33 +1,29 @@ import os import re -import sys import subprocess +import sys from pathlib import Path -from setuptools import Extension, setup, find_packages + +from setuptools import Extension, find_packages, setup from setuptools.command.build_ext import build_ext # Read the configuration settings class UserInputRequired(Exception): + """ + Defines a custom exception to handle cases where user input is necessary for + further processing. It extends Python's built-in `Exception` class, allowing + it to be used as an exception type in error handling scenarios. + + """ def __init__(self, msg): super().__init__(msg) -FRI_VERSION = None -try: - from fri_config import FRI_VERSION -except ImportError: - pass - -if FRI_VERSION is None: - STARTC = "\033[91m" - ENDC = "\033[0m" +FRI_CLIENT_VERSION = os.environ.get("FRI_CLIENT_VERSION") +if FRI_CLIENT_VERSION is None: raise UserInputRequired( - "\n\n" - + STARTC - + ">> FRI_VERSION not set in fri_config.py, refer to the Install section in README.md. <<" - + ENDC - + "\n" + "Please set the environment variable FRI_CLIENT_VERSION to the version of the FRI Client SDK you are using." ) # Convert distutils Windows platform specifiers to CMake -A arguments @@ -43,13 +39,56 @@ def __init__(self, msg): # The name must be the _single_ output extension from the CMake build. # If you need multiple extensions, see scikit-build. class CMakeExtension(Extension): + """ + Initializes a CMake extension with a given name and source directory path. It + sets up a base extension instance and resolves the source directory path to + an absolute file system path, ensuring it's usable within the Python environment. + + Attributes: + sourcedir (Path|str): Initialized with a directory path that resolves to + an absolute file system path using `os.fspath(Path(sourcedir).resolve())`. + + """ def __init__(self, name: str, sourcedir: str = "") -> None: + """ + Initializes an instance with specified name and sourcedir. If sourcedir + is empty, it defaults to an empty string. It calls the parent class's + constructor, passing name and an empty list of sources. It then resolves + the sourcedir path using os.fspath and Path. + + Args: + name (str): Required (no default value specified) for initializing the + object. It serves as an identifier or label for the object, likely + used to reference it later in the program. + sourcedir (str): Optional, indicated by its default value being an + empty string "". It specifies the directory from which sources + will be taken. The os.fspath and Path.resolve() are used to resolve + it. + + """ super().__init__(name, sources=[]) self.sourcedir = os.fspath(Path(sourcedir).resolve()) class CMakeBuild(build_ext): + """ + Builds CMake extensions for Python packages by executing a series of commands + to configure and build the extension using CMake. It handles various environment + variables and generator configurations for different platforms and compilers. + + """ def build_extension(self, ext: CMakeExtension) -> None: + """ + Builds a C++ extension using cmake, handling various configuration options + and environment variables to generate and compile the extension for different + platforms and architectures. + + Args: + ext (CMakeExtension): Not explicitly described in the code snippet + provided. However, based on its use in the code, it appears to be + an object representing a CMake extension that needs to be built. + + """ # Must be in this form due to bug in .resolve() only fixed in Python 3.10+ ext_fullpath = Path.cwd() / self.get_ext_fullpath(ext.name) extdir = ext_fullpath.parent.resolve() @@ -134,7 +173,10 @@ def build_extension(self, ext: CMakeExtension) -> None: build_args += [f"-j{self.parallel}"] # Set the FRI version number - cmake_args += [f"-DFRI_VERSION={FRI_VERSION}"] + fri_ver_major = FRI_CLIENT_VERSION.split(".")[0] + fri_ver_minor = FRI_CLIENT_VERSION.split(".")[1] + cmake_args += [f"-DFRI_CLIENT_VERSION_MAJOR={fri_ver_major}"] + cmake_args += [f"-DFRI_CLIENT_VERSION_MINOR={fri_ver_minor}"] build_temp = Path(self.build_temp) / ext.name if not build_temp.exists(): @@ -151,8 +193,8 @@ def build_extension(self, ext: CMakeExtension) -> None: setup( name="pyFRI", version="1.2.0", - author="Christopher E. Mower", - author_email="christopher.mower@kcl.ac.uk", + author="Christopher E. Mower, Martin Huber", + author_email="christopher.mower@kcl.ac.uk, m.huber_1994@hotmail.de", description="Python bindings for the FRI Client SDK library.", long_description="", packages=find_packages(), From 9605c83d6f1aced75ca733569422c757b85c3b7d Mon Sep 17 00:00:00 2001 From: "komment-ai[bot]" <122626893+komment-ai[bot]@users.noreply.github.com> Date: Sun, 8 Sep 2024 16:25:10 +0000 Subject: [PATCH 3/3] Added comments to 82 items across 9 files --- examples/LBRJointSineOverlay.py | 145 +++++----- examples/LBRTorqueSineOverlay.py | 140 +++++---- examples/LBRWrenchSineOverlay.py | 148 +++++----- examples/hand_guide.py | 99 ++++--- examples/joint_teleop.py | 149 +++++----- examples/task_teleop.py | 165 +++++------ pyfri/tools/filters.py | 136 +++++++++ pyfri/tools/state_estimators.py | 467 +++++++++++++++++++++++++++++++ setup.py | 63 +++-- 9 files changed, 1054 insertions(+), 458 deletions(-) create mode 100644 pyfri/tools/filters.py create mode 100644 pyfri/tools/state_estimators.py diff --git a/examples/LBRJointSineOverlay.py b/examples/LBRJointSineOverlay.py index 504cea4..d44fa2e 100644 --- a/examples/LBRJointSineOverlay.py +++ b/examples/LBRJointSineOverlay.py @@ -6,54 +6,54 @@ import numpy as np import pandas as pd -import pyFRI as fri +import pyfri as fri class LBRJointSineOverlayClient(fri.LBRClient): """ - Simulates a sine wave overlay on specific joints of a robot, with adjustable - frequency, amplitude, and filtering. It monitors state changes to update its - parameters and applies the simulated movement when ready. It also waits for - commands and adjusts joint positions accordingly. + Generates a sinusoidal joint position offset for specific joints on an industrial + robot, controlled by a sine wave with adjustable frequency and amplitude, + filtering the output to smooth out oscillations. Attributes: - joint_mask (numpyndarray|int): Used to select a subset of joints for which - sine overlay is applied, ignoring others. - freq_hz (float): Initialized by the user when creating an instance of the - class. It represents a frequency value in Hertz that determines the - oscillation period of a sine wave overlay for joint positions. - ampl_rad (float): Used to scale the sine wave offset values in radians. - It controls the amplitude of the sinusoidal motion added to the joint - positions. - filter_coeff (float): Used as a coefficient for a simple exponential - smoothing filter to smooth out the oscillation amplitude. - offset (float): Initialized to 0. It represents a filtered sine wave offset - value used to update joint positions during the command method. - phi (float): Used as a phase angle in a mathematical function, specifically - the sine function. It increments by `self.step_width` each time the - command method is called. - step_width (float): 0 by default. It calculates the angular step for sine - wave overlay based on frequency, sample time, and is updated in `onStateChange`. + joint_mask (numpyndarray|int): Used to specify which joints of a robot are + affected by the sine wave overlay. + freq_hz (float): Intended to represent the frequency of a sine wave, + measured in Hertz (Hz). It appears to be used for generating a sinusoidal + offset. + ampl_rad (float): A parameter controlling the amplitude (in radians) of + the sine wave used to modulate joint positions. It determines the + maximum deviation from the base joint position. + filter_coeff (float): 0 by default, used for low-pass filtering the offset + value of the sine wave, effectively reducing its amplitude over time. + offset (float): Initialized to 0.0. It represents a value used to modify + joint positions based on a sine wave pattern, filtered over time with + a specified coefficient. + phi (float): Initialized to zero. It increments by a calculated step width + during each command execution, simulating rotation at the specified frequency. + step_width (float): Set in the `onStateChange` method to twice pi times + the frequency (in Hz) times the sample time, essentially defining a + phase increment for each sampling step. """ def __init__(self, joint_mask, freq_hz, ampl_rad, filter_coeff): """ - Initializes an object with specified parameters, including joint mask, - frequency, amplitude, and filter coefficient, as well as additional - attributes for offset, phase, and step width. + Initializes its attributes, including joint mask, frequency, amplitude, + filter coefficient, offset, phase angle, and step width from input parameters. + It calls the parent class's `__init__` method using `super().__init__()`. Args: - joint_mask (bool | np.ndarray): 2-dimensional. It represents joint - (common) mask, likely used to specify which elements are valid for - further calculations or filtering processes. - freq_hz (float): Expected to be set to a value representing frequency - in hertz. It initializes an instance variable `freq_hz` of the class. - ampl_rad (float): 0.0 by default. It appears to represent an amplitude - value in radians, suggesting it could be related to oscillatory - behavior or wave properties. - filter_coeff (float | int): Assigned from an unknown source. Its purpose - is unclear without more context, but its name suggests it could - be related to filter coefficients used in signal processing. + joint_mask (np.ndarray | List[int]): 2D with shape (n_joints, n_joints), + representing the adjacency matrix for joints used to connect and + filter movement data. + freq_hz (float): Required for initialization, specifying the frequency + in Hertz. + ampl_rad (float): 3rd argument passed to this method, it represents + the amplitude of an oscillation in radians. It seems to be related + to angular measurements in a sinusoidal signal. + filter_coeff (float): Set by the user when creating an instance of + this class. It likely represents a coefficient used for filtering + data, possibly as part of a low-pass or high-pass filter calculation. """ super().__init__() @@ -70,15 +70,17 @@ def monitor(self): def onStateChange(self, old_state, new_state): """ - Resets certain parameters when the robot transitions to the 'MONITORING_READY' - state, and prints a message indicating the change in state. + Updates its internal state when the robot's session state changes to + MONITORING_READY, resetting certain parameters such as offset, phi, and + step width based on the robot's frequency and sample time. Args: - old_state (str | Enum): Described by the documentation as "the previous - state". It represents the state of the object before it was changed. - new_state (Enum[fri.ESessionState]): Passed to the function when the - state of an object changes. The specific value depends on the - context in which the function is called. + old_state (fri.ESessionState): Used to store the previous state of an + object or system before its current state changes. Its value is + passed as an argument from another function call. + new_state (Enum[fri.ESessionState]): Updated to MONITORING_READY on + state change, which triggers resetting several attributes of the + class instance. """ print(f"Changed state from {old_state} to {new_state}") @@ -91,9 +93,9 @@ def onStateChange(self, old_state, new_state): def waitForCommand(self): """ - Sets the joint position of the robot based on the current Ipo joint position, - converting it to a float32 format before applying it. It appears to be - waiting for a command from an external source or system. + Updates the robot's joint positions using values retrieved from an IPO + (In-Place Optimization) calculation, converting them to float32 data type + before applying them. """ self.robotCommand().setJointPosition( @@ -102,9 +104,9 @@ def waitForCommand(self): def command(self): """ - Updates the offset for joint movements, calculates the new position based - on the updated offset and applies it to the robot's joints. It uses a - low-pass filter to smoothly transition between new and previous offsets. + Updates the offset value based on filtered sine wave input, adjusts the + phase angle to ensure it remains within a specific range, and sends the + updated joint position command to the robot. """ new_offset = self.ampl_rad * math.sin(self.phi) @@ -119,34 +121,31 @@ def command(self): self.robotCommand().setJointPosition(joint_pos.astype(np.float32)) -def get_arguments(): +def args_factory(): """ - Parses command-line arguments for a program that communicates with a KUKA - Sunrise Controller, including hostname, port number, joint mask, frequency, - amplitude, filter coefficient, and data saving flag. It uses type conversion - and error checking to validate input values. + Creates an argument parser for a program that interacts with a KUKA Sunrise + Controller, accepting various parameters such as hostname, port number, joint + mask, frequency and amplitude of a sine wave, filter coefficient, and data + saving flag. Returns: - argparseNamespace: An object containing all the command line arguments - parsed by ArgumentParser. This object has attributes for each argument - added to the parser. + argparseNamespace: An object that stores the parsed command line arguments + as attributes. It encapsulates all argument values provided to the script. """ def cvt_joint_mask(value): """ - Converts a given value into an integer that represents a joint mask and - checks if it falls within the specified valid range (0 to 6). If not, it - raises an error with a descriptive message indicating the invalid value - and the acceptable range. + Converts a given input value to an integer and checks if it falls within + the specified range (0-6 inclusive). If valid, it returns the integer; + otherwise, it raises an error with a descriptive message. Args: - value (Union[int, str]): Expected to be either an integer within the - specified range or a string that can be converted to an integer - within this range. + value (Union[int, str]): Expected to be within the range of integer + values from 0 to 7 exclusive. Returns: - int|None: A integer value representing joint mask index from 0 to 6 - if valid input is provided otherwise it raises an ArgumentTypeError exception. + int|None: Integer value within the range [0, 7) or raises an error if + not within this specified range. """ int_value = int(value) @@ -210,19 +209,19 @@ def cvt_joint_mask(value): def main(): """ - Initializes a KUKA Sunrise controller connection, runs an LBRJointSineOverlay - client application, collects data when specified, and plots it as a 4x1 subplot - after disconnection, displaying measured and IPO positions along with torque - values over time. + Initializes a KUKA Sunrise controller client, collects data and performs a + joint sine overlay test. It handles connections, disconnections, and data + saving, then displays a plot of collected data using matplotlib. Returns: - int|None: 0 on successful completion and non-zero if connection to KUKA - Sunrise controller fails. + int|None: 0 when no exceptions occur and when the application runs + successfully, otherwise it returns a non-zero value, specifically 1 if the + connection to KUKA Sunrise controller fails. """ print("Running FRI Version:", fri.FRI_CLIENT_VERSION) - args = get_arguments() + args = args_factory() client = LBRJointSineOverlayClient( args.joint_mask, args.freq_hz, args.ampl_rad, args.filter_coeff ) diff --git a/examples/LBRTorqueSineOverlay.py b/examples/LBRTorqueSineOverlay.py index 8986ecc..bf61e7c 100644 --- a/examples/LBRTorqueSineOverlay.py +++ b/examples/LBRTorqueSineOverlay.py @@ -4,52 +4,51 @@ import numpy as np -import pyFRI as fri +import pyfri as fri class LBRTorqueSineOverlayClient(fri.LBRClient): """ - Implements a client for Lab Robot (LBR) torque control with a sine wave overlay - on specified joints. It monitors state changes, waits for commands, and sends - joint position and torque commands to the robot based on user-defined frequency - and amplitude parameters. + Implements a client for an LBR robot, specifically designed to apply sinusoidal + torques to joints masked by the `joint_mask` attribute. It overrides default + behavior to control joint positions and torques during monitoring and command + phases. Attributes: - joint_mask (Sequence[int]): Initialized in the constructor method, but its - exact nature or how it is used is unclear from this code snippet alone. - freq_hz (float): Initialized with a frequency value that will be used to - generate a sine wave for the torque command. It determines the number - of cycles per second in the sinusoidal pattern. - torque_ampl (float): Initialized with the torque amplitude value provided - to its constructor. It stores the maximum torque magnitude applied to - joints during oscillation. - phi (float): Initialized to zero. It represents a phase angle used for - generating sine wave amplitudes in the `command` method. - step_width (float): 0 by default. It represents the time increment for - updating the torque offset based on the sine wave frequency and sample - time. - torques (npndarray[float32,ndim1]): 0-indexed with a length equal to - `fri.LBRState.NUMBER_OF_JOINTS`, representing a numerical array - containing torques applied to each joint. + joint_mask (Sequence[int]): Used to specify which joints are subject to + the sinusoidal torque command. It appears to be a subset of the robot's + total number of joints. + freq_hz (float): Initialized with a specific value during object creation, + which represents the frequency of the sine wave used to modulate the + torques applied to the robot's joints. + torque_ampl (float): Initialized by the `__init__` method with the value + provided to it as an argument. It represents the amplitude of the sine + wave used in torque calculations. + phi (float): Initialized to 0. It keeps track of a phase angle used in + calculating torque offsets for sine wave-like motion, incremented by + `self.step_width` on each call to `command`. + step_width (float): 0 by default. It stores the angular step size of a + sine wave used to modulate torque amplitudes, calculated as twice the + product of frequency and sample time. + torques (npndarray[float32,ndim1]): Initialized as a zero-filled array + with the size corresponding to the number of joints of the robot. It + stores torques values for each joint during the command execution. """ def __init__(self, joint_mask, freq_hz, torque_amplitude): """ - Initializes instance variables for joint mask, frequency, torque amplitude, - phase angle, step width, and torques array with zeros, setting default - values where applicable. + Initializes its attributes, including joint mask, frequency, and torque + amplitude, and sets default values for phase, step width, and torques. It + also calls the parent class's constructor using super(). Args: - joint_mask (np.ndarray[bool]): Used to specify which joints are being - controlled by the object. It has boolean values indicating whether - each joint should be included (True) or not (False). - freq_hz (float): Intended to represent a frequency value measured in - Hertz. It is used as an input to initialize the object with its - specified value. - torque_amplitude (float): Assigned to the instance attribute - `self.torque_ampl`. It represents an amplitude value related to - torque, but its precise purpose is not clearly stated in this code - snippet. + joint_mask (np.ndarray | List[int]): 1D array or list containing boolean + values that determine which joints to consider for torque application. + freq_hz (float): Used to represent the frequency in Hertz. It determines + the speed or oscillation rate of an oscillatory signal in this context. + torque_amplitude (float): Used to specify the amplitude of torque. + This value determines the maximum amount of torque that can be + applied during motion. """ super().__init__() @@ -65,17 +64,16 @@ def monitor(self): def onStateChange(self, old_state, new_state): """ - Updates internal state variables based on changes to the robot's session - state. It resets torque values and certain parameters when the session - enters a monitoring-ready state. + Resets internal state when the robot enters the MONITORING_READY state, + setting torque and phi values to zero and recalculating a step width based + on frequency and sample time. Args: - old_state (fri.ESessionState): Passed to indicate the state that - occurred before the change. It represents the current state of the - system or robot session when it transitioned from one state to another. - new_state (fri.ESessionState): Compared to a value MONITORING_READY, - which suggests it represents an enumeration or state machine that - tracks the current session state of a robotic system. + old_state (fri.ESessionState | None): The state of the system before + the change occurred. + new_state (Enum[fri.ESessionState] | Enum[fri.LBRState]): Initialized + with a value from fri.ESessionState or fri.LBRState based on whether + the state changed is due to monitoring ready event or any other event. """ print(f"State changed from {old_state} to {new_state}") @@ -89,8 +87,8 @@ def onStateChange(self, old_state, new_state): def waitForCommand(self): """ - Synchronizes joint positions with the robot's current IPO position, then - sets the robot's torque to a specified value if it is in Torque mode. + Waits for and applies a joint position command to the robot, with or without + torque commands depending on the client's mode. """ self.robotCommand().setJointPosition(self.robotState().getIpoJointPosition()) @@ -100,9 +98,9 @@ def waitForCommand(self): def command(self): """ - Updates the robot's joint position to match its IPO (in-positioning - operation) trajectory and applies a torque offset to specific joints based - on a sinusoidal pattern, with an adjustable amplitude and phase. + Generates torque commands for the robot based on a sine wave pattern, where + the amplitude and phase are modulated by user-defined parameters. It sets + joint positions and torques accordingly to execute this motion command. """ self.robotCommand().setJointPosition(self.robotState().getIpoJointPosition()) @@ -119,35 +117,32 @@ def command(self): self.robotCommand().setTorque(self.torques) -def get_arguments(): +def args_factory(): """ - Parses command line arguments for a KUKA Sunrise Controller program, including - hostname, port number, joint to move, and parameters for sine wave generation. - It validates input using type conversions and range checks, raising exceptions - on invalid values. + Parses command-line arguments using argparse, validating and converting input + values as necessary. It returns a Namespace object containing the parsed + arguments. The function defines type conversion for specific argument types, + including joint masks within a range of 0-7. Returns: - argparseNamespace: A container object holding and providing access to the - arguments passed in as positional and optional (named) arguments parsed - from the command line or other input source. + argparseNamespace: An object that has several named attributes (parsed + arguments) such as hostname, port, joint_mask, freq_hz and torque_amplitude. """ def cvt_joint_mask(value): """ - Converts a given value into an integer that represents a joint mask value - within a specified range (0 to 7). It raises an error if the input is - outside this range. The conversion is implicit for values already in the - valid range. + Converts a given value to an integer and checks if it falls within the + specified range [0, 7). If valid, it returns the integer; otherwise, it + raises an error with a message indicating that the value is out of range. Args: - value (Union[int, str]): Specified as an input to the function when - it is called. It may be either an integer or a string representation - of an integer that can be converted to an integer. + value (Union[int, str] | float): Used to represent a joint mask index. + It can be an integer, float or a string that represents a valid + integer value for a joint mask in Open3D. Returns: - int|None: The index of a joint mask. If the input value is within the - range [0, 7), it returns the integer value. Otherwise, it raises an - ArgumentTypeError with a message describing the valid range. + int: 1 digit integer from 0 to 6 representing joint mask if input is + a valid number in that range, otherwise it raises an error. """ int_value = int(value) @@ -197,19 +192,18 @@ def cvt_joint_mask(value): def main(): """ - Runs a FRI client application, connecting to a KUKA Sunrise controller over a - specified port and hostname. It then enters a loop where it steps through the - application until the session becomes idle or interrupted by a keyboard interrupt. + Initializes and connects to a KUKA Sunrise controller, runs a client application + that applies a sine wave torque overlay to a robot, and exits when the robot + session state becomes IDLE or upon keyboard interrupt. Returns: - int: 1 if connection to KUKA Sunrise controller fails, and 0 otherwise. - The specific return value indicates whether the function executed successfully - or encountered an error. + int|None: 1 if connection to KUKA Sunrise controller fails or 0 otherwise, + indicating successful execution of the application. """ print("Running FRI Version:", fri.FRI_CLIENT_VERSION) - args = get_arguments() + args = args_factory() client = LBRTorqueSineOverlayClient( args.joint_mask, args.freq_hz, args.torque_amplitude ) diff --git a/examples/LBRWrenchSineOverlay.py b/examples/LBRWrenchSineOverlay.py index e986cf8..0a15a17 100644 --- a/examples/LBRWrenchSineOverlay.py +++ b/examples/LBRWrenchSineOverlay.py @@ -4,63 +4,61 @@ import numpy as np -import pyFRI as fri +import pyfri as fri class LBRWrenchSineOverlayClient(fri.LBRClient): """ - Initializes and controls a robot arm to apply sinusoidal wrenches along two - axes, while allowing for monitoring and command mode switching. It updates - wrench values based on frequencies, amplitudes, and phase offsets, ensuring - continuous motion within a periodic range. + Extends an LBR client, allowing it to generate a wrench command that simulates + sine wave oscillations on two axes based on input frequencies and amplitudes. + It updates the wrench command at each state change and sends it to the robot + if in wrench mode. Attributes: - frequencyX (float|None): Initialized in the constructor to hold a frequency - value for the sine overlay along the x-axis. It stores a floating-point - number representing the frequency in Hertz. - frequencyY (float): Assigned a value through the constructor `__init__`. - It represents the frequency of a sine wave along the Y axis. - amplitudeX (float): Used to represent the amplitude of the sine wave in - the x-direction. It determines the magnitude of the oscillation applied - by the wrench command. - amplitudeY (float): Set through the constructor, initialized with a value - passed to the class instantiation. It defines the amplitude of sine - wave for the y-axis component of the wrench signal. - stepWidthX (float): 0 by default. It represents the angle increment for - the sine wave generation on the X-axis, calculated as 2π times the - frequency of oscillation multiplied by the sample time of the robot state. - stepWidthY (float): 0.0 by default. It stores the step width for updating - the Y-component of a wrench, calculated as 2 * pi times frequency Y - times sample time. - phiX (float): 0 by default. It stores a phase offset for sine wave generation - used to calculate wrench components, and is updated based on the - frequency and sample time in the `onStateChange` method. - phiY (float): 0 by default. It accumulates phase changes when commanded - to do so in methods like `command`. Its initial value is reset to zero - when the client state changes to MONITORING_READY. - wrench (ndarray[npfloat32,6]): Initialized to a zero vector with six - elements in its constructor. It represents a wrench applied to the - robot, composed of torque and force components along each axis. + frequencyX (float): Initialized with a user-provided value in the constructor + (`__init__`). It represents the frequency of the sine wave applied to + one axis. + frequencyY (float): Initialized by the user through the constructor method, + along with its corresponding amplitude and phase values for generating + a sine wave in the Y-direction. + amplitudeX (float): Used to represent the amplitude of a sine wave applied + along axis X when generating wrench commands. It is set by the user + via the class constructor. + amplitudeY (float): 2.0 by default. It represents the amplitude or magnitude + of the sine wave applied to joint Y in the wrench command. + stepWidthX (float): 0.0 by default. It represents the increment in phase + angle phiX for each time step, calculated as twice pi times frequencyX + times the sample time. + stepWidthY (float): Initialized to zero during initialization. It is updated + in the `onStateChange` method by calculating it as twice pi times the + product of the frequency Y, the sample time, and math constant pi. + phiX (float): 0 by default. It represents a phase angle used to calculate + the sine wave amplitude for the X-axis (wrench 0). + phiY (float): Initialized to 0.0. It represents the phase angle of the + sine wave for joint Y and is updated incrementally in the `command` + method based on its step width. + wrench (npndarray[float32]): 6 elements long, representing a wrench force + with six degrees of freedom (three for force and three for torque). """ def __init__(self, frequencyX, frequencyY, amplitudeX, amplitudeY): """ - Initializes an instance with parameters that define a sinusoidal wrench, - including frequencies and amplitudes for both X and Y axes, initializing - various internal attributes. + Initializes an object by setting its attributes based on the provided + frequencies and amplitudes for sine wave overlays in X and Y directions, + as well as initializing other variables to represent wrench values and angles. Args: - frequencyX (float): Used to set the frequency of oscillation in the - X-direction. It represents how often the signal repeats itself - along this axis. - frequencyY (float): Set as an attribute of the class instance under - the same name. It is assigned the value passed to it when an object - of the class is created. - amplitudeX (float): 0 by default when not specified. It represents the - magnitude or size of a movement in the X-direction, likely for a - sinusoidal motion or oscillation. - amplitudeY (float): Used to set the amplitude of the movement in the - Y direction. + frequencyX (float): Set to represent the frequency in the x-direction. + frequencyY (float): Initialized with the value passed to it in the + function call. It represents a frequency associated with one + dimension of an oscillation or vibration, likely in a two-dimensional + context. + amplitudeX (float): Initialized with a value provided by the user + during instantiation of an object. It represents the amplitude of + a sinusoidal wave in the X direction. + amplitudeY (float): Initialized to represent an amplitude in a + two-dimensional oscillation or motion along the Y-axis, typically + used for mathematical models or simulation purposes. """ super().__init__() @@ -79,16 +77,18 @@ def monitor(self): def onStateChange(self, old_state, new_state): """ - Resets certain variables when the robot's state changes to MONITORING_READY, - preparing it for new movements with specified frequencies and sample times. + Resets and recalculates certain internal parameters when the robot's state + transitions to MONITORING_READY. Args: - old_state (ESessionState | None): Used to represent the old state of - the system before it changes, which in this case is expected to - be updated when the new state transitions into MONITORING_READY. - new_state (EnumMember[fri.ESessionState]): Checked against the value - fri.ESessionState.MONITORING_READY indicating that the session has - reached a state of readiness for monitoring. + old_state (fri.ESessionState): Not used within the provided code + snippet. Its purpose appears to be a reference to a previous state, + although its value is immediately discarded upon comparison with + the new state. + new_state (EnumMember[fri.ESessionState]): Used to identify the state + of a session. It holds the new state value after an event or + transition occurs. In this case, it checks for the 'MONITORING_READY' + state. """ if new_state == fri.ESessionState.MONITORING_READY: @@ -103,10 +103,9 @@ def onStateChange(self, old_state, new_state): def waitForCommand(self): """ - Sets the joint positions of the robot to its IPO (In-Positioning Override) - joint positions and optionally applies a wrench command based on the - client's current command mode. It is likely called after receiving a new - command from the server. + Synchronizes robot joint positions and wrench settings with current client + command mode, updating joint positions according to IPO (Intermediate Pose + Option) data when in WRENCH mode. """ self.robotCommand().setJointPosition(self.robotState().getIpoJointPosition()) @@ -115,10 +114,9 @@ def waitForCommand(self): def command(self): """ - Generates and sends wrench commands to a robot in a specific client mode. - It sets joint positions based on IPO data, calculates sinusoidal wrench - values for X and Y axes, and updates these values over time by incrementing - phase angles modulo 2π. + Generates and sends joint position commands to the robot, implementing a + sine wave pattern for the wrench commands when the client command mode is + set to WRENCH. """ self.robotCommand().setJointPosition(self.robotState().getIpoJointPosition()) @@ -138,17 +136,17 @@ def command(self): self.robotCommand().setWrench(self.wrench) -def get_arguments(): +def args_factory(): """ - Parses command-line arguments using the `argparse` library, extracting and - returning their values as a structured object. It defines several optional - arguments with default values for various settings related to communication - and sine wave generation. + Creates an argument parser to extract user input from command-line arguments. + It defines several optional parameters, including hostname, port, frequencies, + and amplitudes for sine waves in x- and y-axes, and returns a parsed namespace + object containing these values. Returns: - argparseNamespace: An object containing all the arguments passed as a - dictionary-like object, where each key is the argument name and the - corresponding value is the parsed argument from user input. + argparseNamespace: An object containing all arguments parsed from command + line and their values. This includes hostname, port, frequencyX, frequencyY, + amplitudeX, and amplitudeY. """ parser = argparse.ArgumentParser(description="LRBJointSineOverlay example.") @@ -199,18 +197,18 @@ def get_arguments(): def main(): """ - Initializes and runs an FRI client application, connecting to a KUKA Sunrise - controller, executing a wrench-sine-overlay motion plan, and handling any - errors or interruptions that may occur during execution. + Initializes a client application to connect to a KUKA Sunrise controller, sets + up an overlay for sine movement, and runs indefinitely until idle or interrupted + by a KeyboardInterrupt. Returns: - int|None: 1 if connection to KUKA Sunrise controller fails and 0 otherwise, - or None if the function terminates due to an exception being caught. + int: 1 if connection to KUKA Sunrise controller failed, and 0 otherwise + indicating successful execution. """ print("Running FRI Version:", fri.FRI_CLIENT_VERSION) - args = get_arguments() + args = args_factory() print(args) client = LBRWrenchSineOverlayClient( args.frequencyX, args.frequencyY, args.amplitudeX, args.amplitudeY diff --git a/examples/hand_guide.py b/examples/hand_guide.py index c8c8163..e034aaa 100644 --- a/examples/hand_guide.py +++ b/examples/hand_guide.py @@ -4,9 +4,9 @@ import numpy as np from admittance import AdmittanceController -import pyFRI as fri -from pyFRI.tools.filters import ExponentialStateFilter -from pyFRI.tools.state_estimators import ( +import pyfri as fri +from pyfri.tools.filters import ExponentialStateFilter +from pyfri.tools.state_estimators import ( FRIExternalTorqueEstimator, JointStateEstimator, WrenchEstimatorTaskOffset, @@ -20,40 +20,39 @@ class HandGuideClient(fri.LBRClient): """ - Initializes a hand guide client for an FRI (Force-Controlled Robot Interface) - compliant robot arm. It estimates joint states and wrenches, applies filters - to stabilize control inputs, and issues position commands to the robot based - on estimated values and controller feedback. + Simulates an admittance controller for a robot arm's end-effector, estimating + and filtering wrenches to adjust joint positions in real-time based on external + forces applied to the arm. It ensures a specific client command mode is used. Attributes: - controller (AdmittanceController|None): Initialized with lbr_ver in the - __init__ method, representing a controller object for admittance control - of a robot manipulator. - joint_state_estimator (JointStateEstimator): Initialized with `self` as - its argument, suggesting that it is used to estimate the joint states - of the robot based on data from the client itself. - external_torque_estimator (FRIExternalTorqueEstimator|None): Created by - calling the constructor FRIExternalTorqueEstimator with a reference - to the current object as its argument. - wrench_estimator (WrenchEstimatorTaskOffset|None): An instance of - WrenchEstimatorTaskOffset class. It estimates the wrench applied to - the end-effector of the robot. - wrench_filter (ExponentialStateFilter): Not used directly within the methods - in this snippet, but its filter method is called to modify a wrench - measurement before passing it through the controller. + controller (AdmittanceController): Initialized with a parameter lbr_ver + in the `__init__` method. It represents an admittance control strategy + used to update joint positions based on estimated wrenches and robot + state. + joint_state_estimator (JointStateEstimator|None): Initialized with a + JointStateEstimator instance that takes self as its argument, suggesting + it estimates the state of the robot's joints. + external_torque_estimator (FRIExternalTorqueEstimator|None): Initialized + with the HandGuideClient instance as its self reference in the `__init__` + method. It estimates external torques on the robot arm. + wrench_estimator (WrenchEstimatorTaskOffset|None): Initialized with a set + of four arguments: the `self`, its `joint_state_estimator`, + `external_torque_estimator`, `controller.robot`, and `controller.ee_link`. + wrench_filter (ExponentialStateFilter|None): Initialized as such: + `self.wrench_filter = ExponentialStateFilter()`. It appears to be a + filter for exponential smoothing of wrenches. """ def __init__(self, lbr_ver): """ - Initializes various components for admittance control, including estimators - and filters, setting up relationships between them based on their dependencies. - It sets up a complete structure for controlling an LBR robot arm using - admittance control methods. + Initializes various components including an admittance controller, joint + state estimator, external torque estimator, wrench estimator, and exponential + state filter for a Franka robot arm with specified LBR version. Args: - lbr_ver (str | int): Used to specify the version or variant of the - KUKA LBR robot arm model being controlled by the AdmittanceController - class. + lbr_ver (str | int): An identifier that specifies the type or version + of the LBR (Lightweight Robot) being used. It is passed to various + classes for specific initialization, configuration, or control purposes. """ super().__init__() @@ -80,9 +79,10 @@ def onStateChange(self, old_state, new_state): def waitForCommand(self): """ - Waits for a specific client command mode to be enabled, updates the wrench - estimator, retrieves the current joint position and stores it as self.q, - then calls the command_position method to send a command based on self.q. + Ensures that the client command mode is set to POSITION before proceeding + with further operations. If not, it raises an exit system. Otherwise, it + retrieves the current Ipo joint position and calls the `command_position` + method. """ if self.robotState().getClientCommandMode() != POSITION: @@ -91,16 +91,14 @@ def waitForCommand(self): ) raise SystemExit - self.wrench_estimator.update() self.q = self.robotState().getIpoJointPosition() self.command_position() def command(self): """ - Updates the robot's joint position based on filtered wrench data from an - estimator, controller and filter. It only proceeds with this calculation - if the estimator is ready. Otherwise, it updates the estimator and resets - the joint position. + Generates and sends joint position commands to the robot based on filtered + wrench data from the environment, using a control strategy defined by the + controller function. """ if not self.wrench_estimator.ready(): @@ -122,16 +120,15 @@ def command(self): self.command_position() -def get_arguments(): +def args_factory(): """ - Initializes an argument parser and defines command-line arguments for a script, - including hostname, port, and KUKA LBR Med version number, returning parsed - arguments as an object. + Parses command-line arguments using `argparse`. It defines three required + arguments: hostname, port, and lbr-ver, with specific data types and validation + rules for each. The parsed arguments are then returned as a namespace object. Returns: - argparseNamespace: An object containing parsed arguments from the command - line, specifically hostname, port, and lbr-ver (KUKA LBR Med version number) - along with their corresponding values. + argparseNamespace: An object containing all the command-line arguments + parsed by the ArgumentParser instance. """ parser = argparse.ArgumentParser(description="LRBJointSineOverlay example.") @@ -162,19 +159,19 @@ def get_arguments(): def main(): """ - Initializes and runs a client application for interacting with a KUKA Sunrise - controller, executing steps until the robot reaches an idle state or interrupted - by user input. It handles connections, disconnections, and exceptions. + Establishes a connection to a KUKA Sunrise controller, runs an application + loop, and ensures proper cleanup upon exit or termination. It also displays + version information and error messages as necessary. The connection is maintained + until the controller enters an idle state. Returns: - int: 0 on successful execution and 1 if connection to KUKA Sunrise controller - fails, indicating an unsuccessful run. The returned value is used as a - return code for the program. + int: 0 on successful execution and 1 on failure to connect to KUKA Sunrise + controller. """ print("Running FRI Version:", fri.FRI_CLIENT_VERSION) - args = get_arguments() + args = args_factory() client = HandGuideClient(args.lbr_ver) app = fri.ClientApplication(client) success = app.connect(args.port, args.hostname) diff --git a/examples/joint_teleop.py b/examples/joint_teleop.py index 98ffdf0..eec0ba8 100644 --- a/examples/joint_teleop.py +++ b/examples/joint_teleop.py @@ -1,11 +1,12 @@ +import argparse import sys +# FRI Client: https://github.com/cmower/FRI-Client-SDK_Python +import pyfri as fri + # PyGame: https://www.pygame.org/news import pygame -# FRI Client: https://github.com/cmower/FRI-Client-SDK_Python -import pyFRI as fri - pygame.init() # NumPy: https://numpy.org/ @@ -14,34 +15,35 @@ class Keyboard: """ - Initializes a Pygame window and controls a virtual joint with key presses. It - can be turned on or off by pressing numbered keys, and its direction is - controlled by left and right arrow keys, with an escape key for exit. + Handles user input to control a joint's movement. It sets up a Pygame display + and responds to key presses and releases to turn joints on/off, change direction, + and adjust velocity. The current state is returned along with the maximum + allowed velocity. Attributes: - max_joint_velocity (float): Used to represent the maximum allowed velocity - for a joint, set to 5 degrees radian when initialized using the - `np.deg2rad(5)` function. - joint_index (NoneType|int): Initialized to None in the __init__ method. - It stores the index of the currently active joint, which ranges from - 1 to 7 (inclusive) when one of the number keys 1-7 is pressed. - joint_velocity (float): 0 by default. It accumulates and stores the net - velocity change applied to a joint based on the direction keys pressed, - bounded within a maximum limit defined by `max_joint_velocity`. - key_to_dir_map (Dict[int,float]): Initialized with a dictionary that maps - keys from Pygame's constant for left arrow to a float value of 1.0, - and right arrow key to -1.0. - key_joint_map (List[pygameK_]|pygameK_ESCAPE): 7 elements long, which maps - keyboard keys to their corresponding joints. It stores a list of - integers representing the key codes of the numbered keys on the keyboard - (1-7). + max_joint_velocity (float): Set to `np.deg2rad(5)`, which is equivalent + to 0.0872665 radians, representing the maximum velocity allowed for a + joint. + joint_index (NoneType|int): Initialized to None. It keeps track of the + currently selected joint, with indices starting from 1, incrementing + from left to right for the number keys. + joint_velocity (float): 0 by default. It is used to represent the speed + at which a joint is being turned, updated based on user input from + keys mapped to direction changes. + key_to_dir_map (Dict[int,float]): Used to map Pygame key codes to + floating-point values representing direction. It maps two keys (LEFT, + RIGHT) to -1.0 and 1.0 respectively. + key_joint_map (List[int]): Mapped to a list of integers representing keys + on the keyboard, specifically numeric keys from 1 to 7. These keys are + used to turn individual joints on or off. """ def __init__(self): """ - Initializes several attributes for handling joint movements using keyboard - input, including display settings, joint velocity limits, and mapping key - presses to direction and joint values. + Initializes various attributes, including display mode, maximum joint + velocity, and key mapping dictionaries. These settings configure the + keyboard controls for a game or interactive application. It prepares the + environment for further interactions. """ pygame.display.set_mode((300, 300)) @@ -68,12 +70,14 @@ def __init__(self): def __call__(self): """ - Manages user input from Pygame events. It handles window close, keyboard - key presses and releases, and updates joint velocity accordingly. + Processes events from Pygame and updates the state of a joint based on + user input, such as key presses or releases, to control its velocity. It + also handles quit and escape events to terminate the program. Returns: - tuple[int,float]: A pair containing an integer indicating the currently - selected joint index and a float representing the scaled joint velocity. + Tuple[int,float]: A combination of an integer representing the index + of the currently active joint and a float indicating the velocity of + that joint, scaled to a maximum allowed value. """ for event in pygame.event.get(): @@ -114,29 +118,27 @@ def __call__(self): class TeleopClient(fri.LBRClient): """ - Enables teleoperation of a robot using keyboard input. It monitors the robot's - state, waits for user commands, and sends control commands to the robot based - on user input, allowing for real-time joint position or torque control. + Provides a teleoperation interface for a robotic arm, allowing users to control + joint positions and torques using keyboard input and PyGame window interaction. + It handles state changes, command processing, and torque application accordingly. Attributes: - keyboard (Callable[[],Tuple[int|None,float]]): Likely a method that returns - the index of the joint to be controlled or None if no control is - desired, along with the velocity goal for that joint. - torques (npndarray[float32]|None): Initialized as a NumPy array of zeros - with length fri.LBRState.NUMBER_OF_JOINTS when the instance is created. - It represents joint torques in Newton meters. + keyboard (Callable[[],Tuple[int|None,float]]): Associated with a keyboard + input handling mechanism, likely related to PyGame or similar library. + It returns a joint index and velocity goal upon key press. + torques (npndarray[float32]): Initialized to zeros with a length equal to + the number of joints, as defined by fri.LBRState.NUMBER_OF_JOINTS. """ def __init__(self, keyboard): """ - Initializes an instance with a keyboard object and sets up attributes to - store the keyboard and torques for the LBR robot, initializing the torques - array with zeros. + Initializes an instance by calling its superclass's constructor, assigning + the keyboard object to an attribute, and initializing a torques array with + zeros based on the LBRState NUMBERS_OF_JOINTS constant. Args: - keyboard (object): Assigned to the instance variable `self.keyboard`. - Its purpose is not specified within the code, but it may represent - an external input device or interface for user interaction. + keyboard (fri.Keyboard | None): Used to store the keyboard object + passed from the environment. """ super().__init__() @@ -148,18 +150,17 @@ def monitor(self): def onStateChange(self, old_state, new_state): """ - Prints state changes and handles transitions to specific states, notably - when the robot reaches the MONITORING_READY state. It resets certain - attributes and displays instructions for controlling the robot joints using - keyboard inputs. + Prints state change messages and performs specific actions when the robot + session enters monitoring-ready state, such as resetting joint torques and + printing control instructions. Args: - old_state (Union[fri.ESessionState, fri.ESessionState | str]): Apparently - an enumeration object or a string representing the previous state - of the session. - new_state (fri.ESessionState): Used to store the current state of the - session. In this specific case, it can be one of the enumeration - values defined under fri.ESessionState. + old_state (fri.ESessionState | None): Used to represent the state of + the robot session before the change occurred. It stores the previous + state when the new state changes. + new_state (Enum[fri.ESessionState]): Set to fri.ESessionState.MONITORING_READY + upon state change, indicating that the robot is now in monitoring + mode ready to execute tasks. """ print(f"State changed from {old_state} to {new_state}") @@ -180,9 +181,9 @@ def onStateChange(self, old_state, new_state): def waitForCommand(self): """ - Retrieves the current joint position from the robot and sets it as the new - target position for the joints, updating the torque command if the client - is in torque mode. + Synchronizes the robot's state with its commanded state by setting joint + positions and torque values based on current IPo (Inverse Positioning + Operation) joint position data from the robot. """ self.q = self.robotState().getIpoJointPosition() @@ -193,10 +194,9 @@ def waitForCommand(self): def command(self): """ - Receives user input from the keyboard and updates the robot's joint positions - based on velocity goals, before sending updated position commands to the - robot's control interface. It also sets torque values when operating in - torque mode. + Updates the robot's joint positions and torques based on user input from + the keyboard. The new joint position is calculated by adding a velocity + goal to the current joint angle, then sent to the robot for execution. """ joint_index, vgoal = self.keyboard() @@ -210,16 +210,16 @@ def command(self): self.robotCommand().setTorque(self.torques.astype(np.float32)) -def get_arguments(): +def args_factory(): """ - Parses command-line arguments using `argparse`. It defines two optional - arguments: `hostname` and `port`, which are used to specify communication - settings with a KUKA Sunrise Controller. The parsed arguments are then returned - as an object. + Parses command-line arguments using argparse. It creates a parser, defines two + optional arguments (hostname and port) with default values, and returns the + parsed arguments as an object containing these values. Returns: - argparseNamespace: An object containing the parsed command line arguments. - This includes the values for the specified options such as hostname and port. + argparseNamespace: An object that holds all arguments passed to the script. + It contains key-value pairs for hostname and port, among others specified + by add_argument calls. """ parser = argparse.ArgumentParser(description="LRBJointSineOverlay example.") @@ -242,19 +242,18 @@ def get_arguments(): def main(): """ - Initializes and connects to a KUKA Sunrise controller, establishing a teleoperation - session using keyboard input and prints messages indicating success or failure - at each stage. It then enters a loop where it repeatedly checks for the robot's - state until idle or interrupted. + Initializes a FRI client application, connects to a KUKA Sunrise controller, + and runs an infinite loop where it continuously checks for robot state changes + until the session becomes idle or interrupted by the user. Returns: - int: 1 if connection to KUKA Sunrise controller fails and 0 otherwise. - This indicates whether the program executed successfully or not. + int|str: 1 on failure and 0 on success indicating whether the connection + to KUKA Sunrise controller was established or not. """ print("Running FRI Version:", fri.FRI_CLIENT_VERSION) - args = get_arguments() + args = args_factory() keyboard = Keyboard() client = TeleopClient(keyboard) app = fri.ClientApplication(client) diff --git a/examples/task_teleop.py b/examples/task_teleop.py index d7d18d4..ddea439 100644 --- a/examples/task_teleop.py +++ b/examples/task_teleop.py @@ -6,7 +6,7 @@ import pygame # FRI Client: https://github.com/cmower/FRI-Client-SDK_Python -import pyFRI as fri +import pyfri as fri pygame.init() @@ -21,8 +21,9 @@ def print_instructions(): """ - Prints a set of instructions to control a robot using direction keys and task - axes on the PyGame window. + Displays a block of text explaining how to control robot joints using keyboard + keys and PyGame window focus. The output is formatted with line separators and + dashes for visual clarity. """ print("\n") @@ -35,36 +36,34 @@ def print_instructions(): class Keyboard: """ - Handles keyboard events to control tasks with velocities. It maps keys to task - indices and velocities, enabling task activation/deactivation and velocity - adjustments using arrow keys or dedicated key shortcuts. It also supports quit - event handling. + Handles user input from a Pygame application. It monitors for key presses and + releases to control tasks and their velocities. The class maps keyboard keys + to task labels or direction values, enabling users to interact with tasks using + specific keys. Attributes: - max_task_velocity (float): 0.04 by default. It represents the maximum - velocity for a task, which can be scaled down using the `self.task_velocity` - attribute when keys are held to control task movement speed. - task_index (None|int): Set to a specific key when it is pressed and its - corresponding task label is turned ON or OFF, otherwise, its value - remains unchanged until the corresponding key is pressed again. - task_velocity (float): 0.0 by default. It accumulates changes based on - keyboard input, indicating how fast a task should be executed with - positive values representing increasing speed and negative values - representing decreasing speed. - key_to_dir_map (Dict[pygameK_LEFT,float]|Dict[pygameK_RIGHT,float]): - Initialized with mappings from pygame keyboard keys to floating point - numbers representing direction values. The map assigns a value of 1.0 - to the LEFT key and -1.0 to the RIGHT key. - key_task_map (OrderedDict[pygameK_x|pygameK_y||pygameK_a,str]): Initialized - to store key-value pairs where keys are specific keyboard keys (e.g., - pygame.K_x) and values are corresponding task labels (e.g., "x"). + max_task_velocity (float): 0.04. This value represents the maximum velocity + with which tasks can be executed based on user input via keyboard. It + scales user input to a specific range of values for task execution. + task_index (NoneType|int): Used to keep track of the current task being + controlled by the user, with a value of None indicating that no task + is currently selected. + task_velocity (float): 0.0 by default. It accumulates change based on key + presses or releases from keys assigned to direction changes, with a + maximum limit set by `self.max_task_velocity`. + key_to_dir_map (Dict[int,float]): Initialized with two key-value pairs, + mapping Pygame keyboard keys to direction values. It maps LEFT arrow + key to a value of -1.0 and RIGHT arrow key to a value of 1.0. + key_task_map (OrderedDict[pygameK_x|pygameK_y||pygameK_a,str]): Used to + map keys on the keyboard to corresponding tasks, represented by string + labels such as "x", "y", etc. """ def __init__(self): """ - Initializes game window settings, defines task velocity and mapping, sets - initial key-task mappings, and establishes relationships between keyboard - keys and their corresponding tasks or directions. + Initializes pygame display, sets task velocity and index to default values, + defines key-to-direction mapping, creates an ordered dictionary mapping + keys to tasks, and assigns various keys to corresponding task mappings. """ pygame.display.set_mode((300, 300)) @@ -89,15 +88,16 @@ def __init__(self): def __call__(self): """ - Handles Pygame events to control tasks and task velocity by keyboard input, - allowing users to turn tasks on/off with specific keys and adjust - direction/velocity with others. It also exits the program upon QUIT or - ESCAPE key press. + Handles Pygame events to manage task execution and velocity control based + on keyboard input. It monitors key presses, turns tasks on or off, and + adjusts task velocity by adding or subtracting direction values when keys + are pressed or released. Returns: - Tuple[int,float]: A tuple containing two values: the current task index - and the scaled velocity of the task. The task index and velocity are - computed based on user input events. + Tuple[Optional[int],float]: 2-element tuple containing: + + * The index of currently active task (or None if no task is active) + * A float representing current task velocity scaled by max allowed velocity. """ for event in pygame.event.get(): @@ -139,33 +139,33 @@ def __call__(self): class TeleopClient(fri.LBRClient): """ - Handles robot teleoperation, monitoring its state and sending commands to the - robot based on user input from a keyboard or other interface. It implements a - kinematic inverse solution for joint position updates. + Implements a teleoperation interface for controlling a robotic arm, using + inverse kinematics to calculate joint positions and torques based on user input + from a keyboard or other external source. Attributes: - ik (Callable[[npndarray,npndarray,float],npndarray]): Used to perform - inverse kinematics calculations, mapping desired joint velocities to - joint positions. Its implementation is not provided in this code snippet. - keyboard (Keyboard|friLBRClient): Used to get tasks from a keyboard input, - where it returns a task index and velocity goal for each key pressed. - It appears to be responsible for translating user input into robot commands. - torques (npndarray[float32]): Initialized with zeros, representing the - torques applied to each joint of a robot arm. It is used to set torque - commands when the client command mode is set to TORQUE. + ik (Callable[[npndarray,npndarray,float],npndarray]): Used to compute the + inverse kinematics (IK) of a robot arm. It takes as input the current + joint positions, desired joint velocities, and sample time, and returns + the new joint positions. + keyboard (object): Assumed to be an instance of a class that manages + keyboard input. + torques (npndarray[float32]): Initialized with zeros to represent joint + torques. It stores the torques for each of the LBR robot's joints, + which are set based on the client command mode. """ def __init__(self, ik, keyboard): """ - Initializes an instance with input kinematics (ik) and keyboard controls. - It sets up the LBR state, storing zeros for torques, indicating no initial - torque. + Initializes an instance by setting its attributes: ik (inverse kinematics), + keyboard, and torques as zero vectors with a specific number of joints. + It also calls the parent class's constructor using super().__init__. Args: - ik (fri.IK): Initialized as an attribute `self.ik`. The exact nature - of this IK object is not provided, but it is assumed to be related - to inverse kinematics calculations. - keyboard (object): Assigned to an instance variable named `self.keyboard`. + ik (object): Assigned to an instance variable of the same name, self.ik. + Its purpose or contents are not specified by this code snippet. + keyboard (object): Referenced as an attribute of instances from this + class. """ super().__init__() @@ -178,16 +178,19 @@ def monitor(self): def onStateChange(self, old_state, new_state): """ - Prints state change notifications and updates internal variables when the - client enters a monitoring ready state, including resetting torque values - and queue. + Resets its internal state when the robot reaches a specific monitoring + ready state, including clearing an active task queue and resetting joint + torques to zero. Args: - old_state (ESessionState | str): Used to store the previous state of - the session before it changes. - new_state (fri.ESessionState | Enum): Used to determine whether an - instruction should be printed, specifically when it reaches the - 'MONITORING_READY' state. + old_state (fri.ESessionState | None): Specified by the function's + signature, indicating that it can take on any value from the enum + fri.ESessionState or be None. Its purpose is to represent the + previous state of the system before the change. + new_state (enum.Enum): Used to track the state of a process, specifically + the ESessionState.MONITORING_READY state, which represents when + monitoring is ready. It's expected to be one of the values from + an enumeration called fri.ESessionState. """ print(f"State changed from {old_state} to {new_state}") @@ -200,10 +203,9 @@ def onStateChange(self, old_state, new_state): def waitForCommand(self): """ - Synchronizes joint positions and torque values with the robot's state, - ensuring that any commands sent by the client are applied to the robot's - current configuration. It sets joint positions and torque values according - to the provided data types. + Updates the robot's joint positions based on the current IPO position and + applies torque if the client command mode is set to TORQUE. It sends these + commands to the robot. """ self.q = self.robotState().getIpoJointPosition() @@ -214,9 +216,9 @@ def waitForCommand(self): def command(self): """ - Updates the robot's joint positions and torques based on user input from - the keyboard, utilizing inverse kinematics to compute optimal joint values - for a specific task index and corresponding velocity goal. + Generates joint positions and/or torques for an LBR robot, based on user + input from the keyboard. It then sends these commands to the robot, using + either position or torque control mode depending on its current configuration. """ task_index, vgoal = self.keyboard() @@ -233,15 +235,17 @@ def command(self): self.robotCommand().setTorque(self.torques.astype(np.float32)) -def get_arguments(): +def args_factory(): """ - Parses command line arguments using the argparse module to provide a structured - way to retrieve input parameters for a KUKA Sunrise Controller communication - script, such as hostname, port, and LBR Med version number. + Parses command line arguments into a namespace object using the ArgumentParser + class from the argparse module. It sets up and validates options for a KUKA + Sunrise Controller connection, including hostname, port, and LBR Med version + number. Returns: - argparseNamespace: An object containing attributes for each argument passed - to it. + argparseNamespace: A container object that holds data passed to a script + by way of its command-line arguments. The actual content varies based on + the provided arguments. """ parser = argparse.ArgumentParser(description="LRBJointSineOverlay example.") @@ -272,18 +276,19 @@ def get_arguments(): def main(): """ - Establishes a connection to a KUKA Sunrise controller, initializes an application - using the client and keyboard, and enters a loop where it continuously steps - through the application until a disconnect or idle session state is encountered. + Initializes a connection to a KUKA Sunrise controller, establishes a teleoperation + client application, and enters a loop where it continuously receives and + processes robot commands until idle or interrupted. Returns: - int|None: 0 on successful execution and 1 if connection to KUKA Sunrise - controller fails. + int|None: 1 if connection to KUKA Sunrise controller fails or None when + interrupted by a system exit or keyboard interrupt. Otherwise, it returns + 0 upon successful execution and disconnection from the controller. """ print("Running FRI Version:", fri.FRI_CLIENT_VERSION) - args = get_arguments() + args = args_factory() ik = IK(args.lbr_ver) keyboard = Keyboard() client = TeleopClient(ik, keyboard) diff --git a/pyfri/tools/filters.py b/pyfri/tools/filters.py new file mode 100644 index 0000000..4008e1d --- /dev/null +++ b/pyfri/tools/filters.py @@ -0,0 +1,136 @@ +import abc +import numpy as np +from collections import deque + + +class StateFilter(abc.ABC): + """ + Initializes a window of fixed size for state tracking. It resets the window + on initialization and when reset() is called. The `append()` method adds data + to the window, and the `filter()` method, which must be implemented by subclasses, + processes the data in the window. + + Attributes: + _window_size (int): Set to window_size during instantiation. It determines + the maximum number of elements that can be stored in the _window deque, + which implements a bounded-length queue data structure with a specified + maxlen. + reset (Callable[[],None]): Defined as a method named reset. It clears the + internal window when called, preparing it for use with a new sequence + or dataset. The default implementation resets the _window deque. + + """ + def __init__(self, window_size): + """ + Initializes an instance with a given window size and calls the `reset` + method to set the internal state accordingly, suggesting it maintains some + sort of memory or history based on this size. + + Args: + window_size (int): Passed to the class constructor. It initializes an + instance variable `_window_size` which stores this value, determining + the size of some window or buffer in subsequent operations. + + """ + self._window_size = window_size + self.reset() + + def reset(self): + self._window = deque([], maxlen=self._window_size) + + def append(self, x): + self._window.append(x.tolist()) + + @abc.abstractmethod + def filter(self, x): + """ + Defines an abstract interface for filtering input values. Implementing + classes must provide their own concrete filter implementation, while this + method signature serves as a contract specifying the required filter + operation. It takes one argument, x. + + Args: + x (Any): Required to be provided when implementing this abstract method + in any subclass, but its usage or structure is not specified in + this declaration. + + """ + pass + + +class ExponentialStateFilter(StateFilter): + """ + Applies an exponential smoothing filter to a time series data. It uses a window + to store previous values, and each new value is calculated as a weighted average + between the current value and the exponentially smoothed previous value. + + Attributes: + _smooth (float): Set to a default value of 0.02 during initialization. It + controls the smoothing effect by determining the proportion of new + values to be used for filtering. + + """ + def __init__(self, smooth=0.02): + """ + Initializes its instance variables: it calls the superclass's constructor + with an argument of 1, and sets the _smooth attribute to a specified value + by default equal to 0.02. + + Args: + smooth (float | int): 0.02 by default. It appears to be used for + smoothing purposes, possibly related to animation or easing effects, + and its value affects the behavior of the class instance. + + """ + super().__init__(1) + self._smooth = smooth + + def filter(self, x): + """ + Calculates an exponentially smoothed value of input x, replacing the oldest + window element with the new value at each step. It maintains a sliding + window of values and returns the most recent smoothed value. + + Args: + x (float | np.ndarray): 1 value representing new input data to be + processed by the filter, appended to the internal window and + smoothed according to the specified smoothing coefficient. + + Returns: + float: A filtered version of input parameter `x`. + + """ + if len(self._window) == 0: + self.append(x) + xp = np.array(self._window[0]) + xf = self._smooth * x + (1.0 - self._smooth) * xp + self.append(xf) + return xf + + +class MovingAverageFilter(StateFilter): + """ + Calculates the moving average of input data. It stores the input values in a + window and returns their mean at each step, effectively smoothing out noise + and providing a running average of the data. The axis=0 parameter indicates + that mean is calculated along columns. + + """ + def filter(self, x): + """ + Appends new data to an internal window and returns the mean value of all + elements within that window, computed along each dimension (axis=0). + + Args: + x (Union[List[float], float, np.ndarray]): Expected to be a single + data point or multiple data points that are used for filtering. + The type annotation indicates it can take various input formats. + + Returns: + numpyndarray: 1-dimensional array representing the mean of a specified + window of values stored in the object's _window attribute after appending + new input x. + + """ + self.append(x) + return np.mean(self._window, axis=0) diff --git a/pyfri/tools/state_estimators.py b/pyfri/tools/state_estimators.py new file mode 100644 index 0000000..fe1c896 --- /dev/null +++ b/pyfri/tools/state_estimators.py @@ -0,0 +1,467 @@ +import abc +import numpy as np +from pyfri import LBRState +from collections import deque + + +# +# Joint state estimator +# + + +class JointStateEstimator: + """ + + JointStateEstimator + =================== + + The JointStateEstimator class keeps track of the joint position, + velocity, and acceleration using the finite difference method. The + joint velocities and accelerations are estimated using a window of + the previous three joint positions. + + """ + + n_window = 3 + + def __init__( + self, + client, + ): + """ + Intercepts and modifies the client's command function to update a moving + window of queue data before executing the original command, thereby updating + the estimator's internal state. + + Args: + client (object | ClientType): Assigned to the instance variable `_client`. + + """ + # Set class attributes/variables + self._client = client + self._first_update = True + self._q = deque([], maxlen=self.n_window) + self._dq = deque([], maxlen=self.n_window - 1) + self._ddq = deque([], maxlen=self.n_window - 2) + + # Monkey patch update_window into command method + orig_command = self._client.command + + def command(*args, **kwargs): + """ + Executes an original command after updating the window state. It takes + variable arguments and keyword arguments from the caller, delegates + them to the original `orig_command`, and handles any changes that may + require a window update. + + Args: + *args (list): List of positional arguments + **kwargs (dict): Dictionary of keyword arguments + + """ + self._update_window() + orig_command(*args, **kwargs) + + self._client.command = command + + def q(self, idx): + return np.array(self._q[idx]) + + def dq(self, idx): + return np.array(self._dq[idx]) + + def ddq(self, idx): + return np.array(self._ddq[idx]) + + def _update_window(self): + """ + Updates a window of joint states and their derivatives, using data from a + robot state client. It calculates and stores joint velocities (dq) and + accelerations (ddq) based on consecutive measured positions. + + """ + # Retrieve state + dt = self._client.robotState().getSampleTime() + q = self._client.robotState().getMeasuredJointPosition().flatten().tolist() + + # Update window + self._q.append(q) + if self._first_update: + for _ in range(self.n_window): + self._q.append(q) + self._dq.append([0.0] * LBRState.NUMBER_OF_JOINTS) + self._ddq.append([0.0] * LBRState.NUMBER_OF_JOINTS) + self._first_update = False + + dq = (self.q(-1) - self.q(-2)) / dt + self._dq.append(dq.tolist()) + + ddq = (self.dq(-1) - self.dq(-2)) / dt + self._ddq.append(ddq.tolist()) + + def get_position(self): + return self.q(-1) + + def get_velocity(self): + return self.dq(-1) + + def get_acceleration(self): + return self.ddq(-1) + + +class TaskSpaceStateEstimator: + """ + + TaskSpaceStateEstimator + ======================= + + The TaskSpaceStateEstimator class allows you to estimate a given + end-effector transform (position and orientation), velocity, and + acceleration. + + """ + + def __init__( + self, client, joint_state_estimator, robot_model, ee_link, base_link=None + ): + """ + Initializes its attributes by setting up various link and Jacobian functions + for a robot model based on given inputs. It uses these to prepare a task + space state estimator. + + Args: + client (object | ClientType): Assigned to self._client, indicating + that it is likely an external interface or connection object that + enables interactions with an external system. + joint_state_estimator (object): Referenced by its instance or class, + indicating it's an estimator used to calculate joint states of a + robot arm from sensor data. + robot_model (object): Expected to provide methods for calculating + transformation functions (`get_global_link_transform_function`, + `get_link_transform_function`) and geometric Jacobian functions + (`get_global_link_geometric_jacobian_function`, `get_link_geometric_jacobian_function`). + ee_link (str | int): Required, representing the end-effector link of + the robot model. It identifies the specific link from which + transformations and Jacobians are obtained. + base_link (str | NoneType): Used to specify the link with respect to + which forward kinematics and Jacobian transformations are calculated. + It defaults to None if not provided. + + """ + self._client = client + self._joint_state_estimator = joint_state_estimator + + # Retrieve kinematics models function + if base_link is None: + self._T = robot_model.get_global_link_transform_function( + ee_link, numpy_output=True + ) + self._J = robot_model.get_global_link_geometric_jacobian_function( + ee_link, numpy_output=True + ) + elif isinstance(base_link, str): + self._T = robot_model.get_link_transform_function( + ee_link, base_link, numpy_output=True + ) + self._J = robot_model.get_link_geometric_jacobian_function( + ee_link, base_link, numpy_output=True + ) + else: + raise ValueError(f"{base_link=} was not recognized") + + def get_transform(self): + """ + Computes the transformation matrix given the current joint positions, which + are obtained from the _joint_state_estimator object using its get_position + method. The computed transformation is then returned. + + Returns: + numpyndarray: 4x4 homogenous transformation matrix T, calculated from + joint state q using self._T(q). This transform represents position and + orientation of a rigid body in 3D space. + + """ + q = self._joint_state_estimator.get_position() + return self._T(q) + + def get_velocity(self): + """ + Computes and returns the velocity of an object in task space by taking the + Jacobian matrix `J` and multiplying it with the joint velocities `dq`, + effectively transforming joint velocities to task-space velocities. + + Returns: + numpyndarray: The velocity of the end effector expressed in world coordinates. + + """ + q = self._joint_state_estimator.get_position() + dq = self._joint_state_estimator.get_velocity() + J = self._J(q) + return J @ dq + + def get_acceleration(self): + """ + Computes the acceleration of the end-effector in task space by taking the + difference between consecutive velocity samples, dividing it by the sample + time, and returning the result. + + Returns: + numpyndarray: 6-element acceleration of a robotic arm expressed as + [ax, ay, az, wx, wy, wz]. + + """ + # Retreive joint states + q = self._joint_state_estimator.q(-1) + qp = self._joint_state_estimator.q(-2) + dq = self._joint_state_estimator.dq(-1) + dqp = self._joint_state_estimator.dq(-2) + + # Compute end-effector current and previous velocity + v = self._J(q) @ dq + vp = self._J(qp) @ dqp + + # Compute and return end-effector acceleration + dt = self._client.robotState().getSampleTime() + return (v - vp) / dt + + +class ExternalTorqueEstimator(abc.ABC): + """ + Defines an abstract base class for estimating external torques. It has a single + abstract method, `get_external_torque`, which must be implemented by any + concrete subclass to return an estimate of the external torque acting on a + system or object. + + """ + @abc.abstractmethod + def get_external_torque(self): + """ + Estimates or calculates an external torque value, likely associated with + an object's movement or rotation. It is abstract and intended to be + implemented by concrete subclass implementations. + + """ + pass + + +class FRIExternalTorqueEstimator(ExternalTorqueEstimator): + """ + Estimates external torques applied to a robot by retrieving and flattening the + external torque data from a client. It serves as an estimator for external + forces acting on a robot, enabling calculation of system dynamics without + explicit force sensing. + + Attributes: + _client (RobotStateClient|RobotClient): Referenced from outside this class + through an external dependency. It provides access to a robot's state, + including its external torque. + + """ + def __init__(self, client): + self._client = client + + def get_external_torque(self): + return self._client.robotState().getExternalTorque().flatten() + + +# +# Estimate wrench at a given tip link. +# + + +class WrenchEstimator(abc.ABC): + """ + + WrenchEstimator + =============== + + Sub-classes of the WrenchEstimator can be used to estimate the + external wrench applied at a given link on the robot, i.e. the tip + link. These methods map the measured external joint torques to the + given task space. An offset is applied to remove bias in the + measurements. How that offset is estimated is implemented by the + respective sub-classes. + + """ + + _rcond = 0.05 # Cutoff for small singular values in pseudo-inverse calculation. + + def __init__( + self, + client, + joint_state_estimator, + external_torque_estimator, + robot_model, + tip_link, + base_link=None, + n_data=50, + ): + """ + Initializes its attributes, setting up estimators for joint states and + external torques, computing a Jacobian matrix based on robot model parameters, + and allocating storage for data. It raises an error if base_link is not recognized. + + Args: + client (object): Required to be provided by the user when instantiating + this class, but its purpose or usage within the class is not described. + joint_state_estimator (object | JointStateEstimator | + AnyOtherJointStateEstimatorType): Used to estimate the state of + robot joints. Its actual implementation and parameters depend on + the provided estimator. + external_torque_estimator (object): Referenced as an attribute + `_external_torque_estimator`. It represents an estimator for + external torques acting on the robot. Its exact functionality + depends on its implementation. + robot_model (object): Used to interact with a robot model, accessing + functions related to geometric Jacobians of the robot's links. + tip_link (str | int): Used to specify the tip link (end effector) of + the robot model. + base_link (None | str): Optional, with a default value of None. It + specifies the base link of a robotic arm to which Jacobian + computations are referenced, or its name if known. + n_data (int): 50 by default. It represents the initial size of an + internal data structure (`self._data`) that will hold collected + data. Its value can be changed at initialization. + + """ + # Create class attributes + self._joint_state_estimator = joint_state_estimator + self._external_torque_estimator = external_torque_estimator + + # Setup jacobian function + if base_link is None: + self._jacobian = robot_model.get_global_link_geometric_jacobian_function( + tip_link, + numpy_output=True, + ) + elif isinstance(base_link, str): + self._jacobian = robot_model.get_link_geometric_jacobian_function( + tip_link, + base_link, + numpy_output=True, + ) + else: + raise ValueError(f"{base_link=} is not recognized.") + + # Setup data collector + self._n_data = n_data + self._data = [] + + def _inverse_jacobian(self): + """ + Computes an approximate pseudoinverse of the Jacobian matrix associated + with the joint state estimator at a given position, based on NumPy's + linalg.pinv function and specified conditioning parameter rcond. + + """ + q = self._joint_state_estimator.get_position() + return np.linalg.pinv(self._jacobian(q), rcond=self._rcond) + + def ready(self): + return len(self._data) >= self._n_data + + def update(self): + """ + Checks if the data length is less than the specified n_data threshold. If + true, it calls the _update_data method to update the data. This indicates + a sequential or iterative process where data needs to be replenished periodically. + + """ + if len(self._data) < self._n_data: + self._update_data() + + @abc.abstractmethod + def _update_data(self): + """ + Defines an abstract function that must be implemented by any subclass of + WrenchEstimator, ensuring it provides functionality to update data. The + underscore prefix indicates it is intended for internal use within the + class and should not be overridden directly. + + """ + pass + + @abc.abstractmethod + def get_wrench(self): + """ + Returns a wrench object. + + """ + pass + + +class WrenchEstimatorJointOffset(WrenchEstimator): + """ + + WrenchEstimatorJointOffset + ========================== + + The offset is computed in the joint space and applied prior to the + wrench being estimated. + + """ + + def _update_data(self): + """ + Updates internal data by appending an estimated external torque to a list, + converting it from a NumPy array to a list using `tolist()`. + + """ + tau_ext = self._external_torque_estimator.get_external_torque() + self._data.append(tau_ext.tolist()) + + def get_wrench(self): + """ + Calculates the wrench, an external force and torque applied to a robot + joint, by subtracting an offset from estimated external torque and + transforming it using inverse jacobian. + + Returns: + numpyndarray: A wrench, calculated as the transpose of the inverse + Jacobian multiplied by the external torque minus an offset. The result + represents the equivalent force at each joint. + + """ + offset = np.mean(self._data, axis=0) + tau_ext = self._external_torque_estimator.get_external_torque() - offset + Jinv = self._inverse_jacobian() + return Jinv.T @ tau_ext + + +class WrenchEstimatorTaskOffset(WrenchEstimator): + """ + + WrenchEstimatorTaskOffset + ========================= + + The offset is computed in the task space and applied after the raw + joint torque values are projected to the task space. + + """ + + def _update_data(self): + """ + Updates internal data by computing the external wrench and appending it + to a list, using an external torque estimator and inverse jacobian matrix + for computation. + + """ + tau_ext = self._external_torque_estimator.get_external_torque() + f_ext = self._inverse_jacobian().T @ tau_ext + self._data.append(f_ext.flatten().tolist()) + + def get_wrench(self): + """ + Computes an estimated wrench by subtracting an offset from the product of + the inverse jacobian and external torque estimate. This result is derived + using NumPy operations on data stored within the instance. + + Returns: + ndarray: A vector representing an external force and moment exerted + on a rigid body, calculated based on estimated external torque, inverse + jacobian, and data mean offset. + + """ + offset = np.mean(self._data, axis=0) + tau_ext = self._external_torque_estimator.get_external_torque() + return self._inverse_jacobian().T @ tau_ext - offset diff --git a/setup.py b/setup.py index 426e73b..5041b2b 100644 --- a/setup.py +++ b/setup.py @@ -11,9 +11,10 @@ # Read the configuration settings class UserInputRequired(Exception): """ - Defines a custom exception to handle cases where user input is necessary for - further processing. It extends Python's built-in `Exception` class, allowing - it to be used as an exception type in error handling scenarios. + Defines a custom exception for handling situations where user input is required. + It inherits from Python's built-in `Exception` class and takes an error message + as an argument, allowing it to be raised with a specific error description + when user input is missing or invalid. """ def __init__(self, msg): @@ -40,30 +41,30 @@ def __init__(self, msg): # If you need multiple extensions, see scikit-build. class CMakeExtension(Extension): """ - Initializes a CMake extension with a given name and source directory path. It - sets up a base extension instance and resolves the source directory path to - an absolute file system path, ensuring it's usable within the Python environment. + Initializes a C extension for Cython build process, taking `name` and optional + `sourcedir` as input. It sets the name and sources attributes, then resolves + and converts the sourcedir path to an absolute string. This prepares the + extension for compilation using CMake. Attributes: - sourcedir (Path|str): Initialized with a directory path that resolves to - an absolute file system path using `os.fspath(Path(sourcedir).resolve())`. + sourcedir (Path|str): Used to store the path to a directory containing + CMake source code files, normalized to a string using `os.fspath` and + resolved to an absolute path with `Path.resolve`. """ def __init__(self, name: str, sourcedir: str = "") -> None: """ - Initializes an instance with specified name and sourcedir. If sourcedir - is empty, it defaults to an empty string. It calls the parent class's - constructor, passing name and an empty list of sources. It then resolves - the sourcedir path using os.fspath and Path. + Initializes an object by calling its superclass's `__init__` method with + the provided name and an empty list as sources, then sets the sourcedir + attribute to a resolved path. Args: - name (str): Required (no default value specified) for initializing the - object. It serves as an identifier or label for the object, likely - used to reference it later in the program. - sourcedir (str): Optional, indicated by its default value being an - empty string "". It specifies the directory from which sources - will be taken. The os.fspath and Path.resolve() are used to resolve - it. + name (str): Required. It represents the name of an object, likely a + source or project, which is passed to the parent class's constructor. + sourcedir (str): Optional, defaulting to an empty string "". It + represents a directory path from which source files will be sourced. + The actual value set by the user is normalized with `os.fspath` + and `Path.resolve`. """ super().__init__(name, sources=[]) @@ -72,21 +73,21 @@ def __init__(self, name: str, sourcedir: str = "") -> None: class CMakeBuild(build_ext): """ - Builds CMake extensions for Python packages by executing a series of commands - to configure and build the extension using CMake. It handles various environment - variables and generator configurations for different platforms and compilers. + Implements a custom build extension for building C++ extensions using CMake. + It generates and builds the extension by invoking CMake and its build tools, + handling various environment variables and configuration options along the way. """ def build_extension(self, ext: CMakeExtension) -> None: """ - Builds a C++ extension using cmake, handling various configuration options - and environment variables to generate and compile the extension for different - platforms and architectures. + Builds an extension using CMake by creating and configuring a build + environment for a specified extension, then running CMake to configure and + compile the extension. Args: - ext (CMakeExtension): Not explicitly described in the code snippet - provided. However, based on its use in the code, it appears to be - an object representing a CMake extension that needs to be built. + ext (CMakeExtension): Used to represent an extension or library being + built with CMake. It contains information such as the name, + sourcedir, etc. of the extension. """ # Must be in this form due to bug in .resolve() only fixed in Python 3.10+ @@ -191,14 +192,14 @@ def build_extension(self, ext: CMakeExtension) -> None: setup( - name="pyFRI", - version="1.2.0", + name="pyfri", + version="1.2.1", author="Christopher E. Mower, Martin Huber", author_email="christopher.mower@kcl.ac.uk, m.huber_1994@hotmail.de", description="Python bindings for the FRI Client SDK library.", long_description="", packages=find_packages(), - ext_modules=[CMakeExtension("_pyFRI")], + ext_modules=[CMakeExtension("_pyfri")], install_requires=["numpy", "pygame", "pyoptas", "pandas", "matplotlib"], cmdclass={"build_ext": CMakeBuild}, python_requires=">=3.8",