nnspike.unit

Hardware control and management units for LEGO SPIKE robot.

This module provides classes and utilities for controlling and managing the LEGO SPIKE robot hardware, including communication, mode management, and real-time data streaming.

Modules:

action_chain: Sequential action execution for robot behaviors etrobot: Main robot control interface for LEGO SPIKE hardware mode_manager: Robot operation mode management and switching spike_status: Robot status monitoring and reporting webcam_video_stream: Real-time video streaming and processing

Example

Basic robot initialization and control:

from nnspike.unit import ETRobot, ModeManager

# Initialize robot robot = ETRobot() mode_manager = ModeManager(robot)

# Start line following mode mode_manager.set_mode(“line_follow”)

class nnspike.unit.ActionChain(et, course)[source]

A class to manage a sequence of actions for an ETRobot.

This class allows you to define a chain of actions, each consisting of setting left and right motor speeds for a specified duration.

__init__(et, course)[source]
avoid_obstacle(init_flag)[source]

Perform a sequence of actions to avoid an obstacle.

Parameters:

et (ETRobot) – The ETRobot instance to control.

Return type:

tuple[float | None, tuple[int, int] | None, Mode]

carry_bottle_phase1(image, predicted_x, init_flag)[source]
Return type:

tuple[float | None, tuple[int, int] | None, Mode]

carry_bottle_phase10(image, predicted_x, init_flag)[source]
Return type:

tuple[float | None, tuple[int, int] | None, Mode]

carry_bottle_phase11(image, predicted_x, init_flag)[source]
Return type:

tuple[float | None, tuple[int, int] | None, Mode]

carry_bottle_phase12(image, predicted_x, init_flag)[source]
Return type:

tuple[float | None, tuple[int, int] | None, Mode]

carry_bottle_phase13(image, predicted_x, init_flag)[source]
Return type:

tuple[float | None, tuple[int, int] | None, Mode]

carry_bottle_phase14(image, predicted_x, init_flag)[source]
Return type:

tuple[float | None, tuple[int, int] | None, Mode]

carry_bottle_phase15(image, predicted_x, init_flag)[source]
Return type:

tuple[float | None, tuple[int, int] | None, Mode]

carry_bottle_phase16(image, predicted_x, init_flag)[source]
Return type:

tuple[float | None, tuple[int, int] | None, Mode]

carry_bottle_phase17(image, predicted_x, init_flag)[source]
Return type:

tuple[float | None, tuple[int, int] | None, Mode]

carry_bottle_phase18(image, predicted_x, init_flag)[source]
Return type:

tuple[float | None, tuple[int, int] | None, Mode]

carry_bottle_phase19(image, predicted_x, init_flag)[source]
Return type:

tuple[float | None, tuple[int, int] | None, Mode]

carry_bottle_phase2(image, predicted_x, init_flag)[source]
Return type:

tuple[float | None, tuple[int, int] | None, Mode]

carry_bottle_phase3(image, predicted_x, init_flag)[source]
Return type:

tuple[float | None, tuple[int, int] | None, Mode]

carry_bottle_phase4(image, predicted_x, init_flag)[source]
Return type:

tuple[float | None, tuple[int, int] | None, Mode]

carry_bottle_phase5(image, predicted_x, init_flag)[source]
Return type:

tuple[float | None, tuple[int, int] | None, Mode]

carry_bottle_phase6(image, predicted_x, init_flag)[source]
Return type:

tuple[float | None, tuple[int, int] | None, Mode]

carry_bottle_phase7(image, predicted_x, init_flag)[source]
Return type:

tuple[float | None, tuple[int, int] | None, Mode]

carry_bottle_phase8(image, predicted_x, init_flag)[source]
Return type:

tuple[float | None, tuple[int, int] | None, Mode]

carry_bottle_phase9(image, predicted_x, init_flag)[source]
Return type:

tuple[float | None, tuple[int, int] | None, Mode]

follow_left_edge(image, predicted_x)[source]
Return type:

tuple[float | None, tuple[int, int] | None, Mode]

follow_right_edge(image, predicted_x)[source]
Return type:

tuple[float | None, tuple[int, int] | None, Mode]

reset()[source]

Reset the action chain by setting the start position to 0.

Return type:

None

class nnspike.unit.ETRobot(port='/dev/ttyACM0')[source]

Interface for communicating with LEGO SPIKE Prime robot over serial connection.

This class provides methods to send commands to and receive sensor data from a LEGO SPIKE Prime robot. It maintains a continuous background thread to receive sensor updates and handles motor commands.

Command IDs should be as same as (spike/slot_prod.py) the script in LEGO Spike Prime.

COMMAND_SET_MOTOR_FORWARD_SPEED_ID

Command ID for setting forward motor speed.

Type:

int

COMMAND_SET_MOTOR_BACKWARD_SPEED_ID

Command ID for setting backward motor speed.

Type:

int

COMMAND_SET_MOTOR_RELATIVE_POSITION_ID

Command ID for relative position movement.

Type:

int

COMMAND_STOP_MOTOR_ID

Command ID for stopping motors.

Type:

int

COMMAND_MOVE_ARM_ID

Command ID for moving robot arm.

Type:

int

CMD_FLAG

Command flag prefix for serial communication.

Type:

bytes

DUMMY

Dummy value for command parameters.

Type:

int

CMD_FLAG = b'CF:'
COMMAND_MOVE_ARM_ID = 205
COMMAND_SET_MOTOR_BACKWARD_SPEED_ID = 202
COMMAND_SET_MOTOR_FORWARD_SPEED_ID = 201
COMMAND_SET_MOTOR_RELATIVE_POSITION_ID = 203
COMMAND_STOP_MOTOR_ID = 204
DUMMY = 1
__init__(port='/dev/ttyACM0')[source]

Initialize the ETRobot with a serial connection.

Parameters:

port (str, optional) – The serial port to connect to. Defaults to “/dev/ttyACM0”.

brake()[source]

Brake the motors of the ETRobot.

Return type:

None

get_spike_status()[source]

Get the spike status with last known good sensor values.

Returns:

Spike status object with consistent sensor data

Return type:

SpikeStatus

move_arm(action, duration=0.5)[source]

Move the arm up or down.

Parameters:

action (int) – Action to perform (0 = move up, 1 = move down, 2 = stop arm).

Return type:

None

receive()[source]

Update ETRobot motor and sensor status by the received data from the GPIO port.

Note

The update rate should be less than the rate of sending sensor data in LEGO Prime Hub (0.0005 seconds).

Return type:

None

retrieve_motors_relative_position()[source]

Retrieve the relative positions of the motors.

Returns:

The sum of the absolute values of the relative positions of both motors.

Return type:

int

set_motor_relative_position(left_positon, right_position)[source]
Return type:

None

set_motor_speed(left_speed, right_speed)[source]

Set the ETRobot motor’s speed.

Parameters:
  • left_speed (int) – Left motor speed (-100-100).

  • right_speed (int) – Right motor speed (-100-100).

Return type:

None

stop()[source]

Stop the robot and close the serial port.

Return type:

None

class nnspike.unit.ModeManager(course)[source]

Manages the current mode of the robot unit based on image analysis and sensor data.

This class is responsible for determining and transitioning between different behavior modes including edge following, obstacle avoidance, and various bottle carrying phases. Mode transitions are determined based on motor position thresholds, image processing results (color detection), and sensor readings.

The class uses toggle flags to prevent repeated transitions and ensures each mode change occurs only once when conditions are met.

__init__(course)[source]

Initialize the ModeManager with a specified course direction.

Parameters:

course (str) – The direction of the course, either “left” or “right”

decide_next_mode(image, et)[source]
Return type:

tuple[Mode, bool]

get_current_mode()[source]

Get the current mode of the action chain.

Return type:

Mode

get_previous_mode()[source]

Get the previous mode of the action chain.

Return type:

Mode

set_current_mode(mode)[source]
Return type:

None

class nnspike.unit.WebcamVideoStream(src, save_video, save_path='', resolution=(640, 320), fps=30)[source]

A threaded video stream reader for webcams with optional video recording.

This class provides a non-blocking way to read frames from a video source by running the capture loop in a separate thread. It supports configurable resolution, frame rate, and optional video recording to file.

The threaded approach helps prevent frame drops and provides smoother video processing by maintaining a minimal buffer size and continuous frame updates in the background.

Parameters:
  • src (int | str) – Video source - can be camera index (int) or video file path (str)

  • save_video (bool) – Whether to save captured video to file

  • save_path (str) – Path for saving video file (required if save_video is True)

  • resolution (tuple) – Video resolution as (width, height) tuple. Default: (640, 320)

  • fps (int) – Frames per second for capture and recording. Default: 30

Example

>>> stream = WebcamVideoStream(src=0, save_video=False)
>>> stream.start()
>>> grabbed, frame = stream.read()
>>> stream.stop()
__init__(src, save_video, save_path='', resolution=(640, 320), fps=30)[source]
read()[source]

Read the most recently captured frame.

If video recording is enabled, also writes the frame to the output file.

Return type:

tuple[bool, np.ndarray | None]

Returns:

Tuple of (success_flag, frame) where success_flag indicates if the frame was successfully captured and frame is the image data as a numpy array, or None if capture failed.

start()[source]

Start the background thread for reading video frames.

Return type:

WebcamVideoStream

Returns:

Self reference for method chaining.

stop()[source]

Stop the video stream and clean up resources.

This method stops the background thread, releases the video writer (if recording), and releases the video capture stream.

Return type:

None

update()[source]

Continuously update frames from the video stream in a background thread.

This method runs in a loop until stopped, constantly reading new frames from the video source to keep the frame buffer current.

Return type:

None

class nnspike.unit.SpikeStatus(raw_data=None)[source]

Class to represent and access the status of a Lego Spike Prime hub.

This class provides a structured way to access the data received from the Spike Prime, including sensor readings, motor positions, and battery status.

__init__(raw_data=None)[source]

Initialize the SpikeStatus object.

Parameters:

raw_data (str | bytes | dict | None) – Optional raw data from the Spike Prime to parse

__str__()[source]

Return a string representation of the status.

Return type:

str

update(data)[source]

Update the status with new data from the Spike Prime.

Parameters:

data (str | bytes | dict) – Raw data from the Spike Prime (string, bytes, or dictionary)

Return type:

None

Modules

action_chain

etrobot

mode_manager

spike_status

webcam_video_stream