nnspike package
nnspike: A LEGO SPIKE Robot using Neural Network for line following.
This package provides tools and utilities for controlling a LEGO SPIKE robot using neural network-based computer vision for line following tasks.
- The package includes:
Unit control classes for robot communication and management
Neural network models for vision-based navigation
Data processing and augmentation utilities
Image processing and control algorithms
Submodules
nnspike.constants module
Constants and enumerations used throughout the nnspike package.
This module defines constants for robot control, camera parameters, and behavior modes used by the LEGO SPIKE robot for line following and navigation tasks.
- class nnspike.constants.Mode(*values)[source]
Bases:
Enum
- FOLLOW_LEFT_EDGE = 0
- FOLLOW_RIGHT_EDGE = 1
- AVOID_OBSTACLE = 2
- CARRY_BOTTLE_PHASE1 = 3
- CARRY_BOTTLE_PHASE2 = 4
- CARRY_BOTTLE_PHASE3 = 5
- CARRY_BOTTLE_PHASE4 = 6
- CARRY_BOTTLE_PHASE5 = 7
- CARRY_BOTTLE_PHASE6 = 8
- CARRY_BOTTLE_PHASE7 = 9
- CARRY_BOTTLE_PHASE8 = 10
- CARRY_BOTTLE_PHASE9 = 11
- CARRY_BOTTLE_PHASE10 = 12
- CARRY_BOTTLE_PHASE11 = 13
- CARRY_BOTTLE_PHASE12 = 14
- CARRY_BOTTLE_PHASE13 = 15
- CARRY_BOTTLE_PHASE14 = 16
- CARRY_BOTTLE_PHASE15 = 17
- CARRY_BOTTLE_PHASE16 = 18
- CARRY_BOTTLE_PHASE17 = 19
- CARRY_BOTTLE_PHASE18 = 20
- CARRY_BOTTLE_PHASE19 = 21
- GOAL = 22
- PHASE_COMPLETED = 99
- MOVE_FORWARD = 100
- MOVE_FORWARD_LEFT = 110
- MOVE_FORWARD_RIGHT = 120
- TURN_LEFT = 130
- TURN_RIGHT = 140
- MOVE_BACKWARD = 150
- PAUSE = 160
- REMOTE_CONTROL = 999
nnspike.data package
Data processing and dataset management for nnspike.
This module provides comprehensive data handling capabilities for neural network training with the LEGO SPIKE robot, including dataset classes, data augmentation, and preprocessing utilities.
- Modules:
aug: Data augmentation functions for improving model robustness dataset: PyTorch dataset classes for different learning tasks preprocess: Data preprocessing and label management utilities
Example
Basic usage for creating and augmenting a dataset:
from nnspike.data import RegressionDataset, augment_dataset
# Create a dataset dataset = RegressionDataset(data_dir=”path/to/data”)
# Apply augmentation augmented_data = augment_dataset(dataset, factor=2)
- nnspike.data.random_shift_scale_rotate(image, shift_limit=0.0625, scale_limit=0.1, rotate_limit=15)[source]
Apply a random shift, scale, and rotation transformation to an input image.
- Parameters:
image (np.ndarray) – The input image to be transformed.
shift_limit (float, optional) – Maximum fraction of total height/width to shift the image. Default is 0.0625.
scale_limit (float, optional) – Maximum scaling factor. Default is 0.1.
rotate_limit (int, optional) – Maximum rotation angle in degrees. Default is 15.
- Returns:
- A tuple containing:
transformed_image (np.ndarray): The transformed image.
params (dict): The parameters used for the transformation.
- Return type:
tuple
Example
>>> import numpy as np >>> image = np.random.rand(100, 100, 3) >>> transformed_image, params = random_shift_scale_rotate(image)
- nnspike.data.augment_dataset(df, p, export_path)[source]
Augment images in a dataset based on specified conditions and save them to a new location.
This function filters a DataFrame to include only rows where ‘use’ is True, then randomly selects rows based on probability p. For each selected row, it loads the image, applies sequential transformations (shift-scale-rotate followed by perspective transform), and saves the augmented image to the export directory. The function returns a new DataFrame containing metadata for all augmented images.
- Parameters:
df (pd.DataFrame) – DataFrame containing image paths and metadata. Must include columns: ‘image_path’, ‘use’, ‘frame_number’, ‘mode’, ‘course’, ‘motor_a_relative_position’, ‘motor_b_relative_position’, ‘data_type’.
p (float) – Probability threshold (0.0-1.0) for applying augmentation to each row where ‘use’ is True. Higher values result in more augmented images.
export_path (str) – Directory path where augmented images will be saved. Must not already exist - the function will create it.
- Returns:
- DataFrame containing metadata of augmented images with the same
structure as the input DataFrame. The ‘image_path’ column will contain paths to the newly created augmented images, and ‘target_x’, ‘left_x’, ‘right_x’ will be set to None.
- Return type:
pd.DataFrame
- Raises:
FileExistsError – If the export_path directory already exists.
Example
>>> import pandas as pd >>> df = pd.DataFrame( ... { ... "image_path": ["/path/to/image1.jpg", "/path/to/image2.jpg"], ... "use": [True, True], ... "frame_number": [1, 2], ... "mode": ["A", "B"], ... "course": ["course1", "course1"], ... "motor_a_relative_position": [0, 10], ... "motor_b_relative_position": [0, -5], ... "data_type": ["train", "train"], ... } ... ) >>> augmented_df = augment_dataset(df, p=0.5, export_path="/path/to/augmented")
Note
The function applies two sequential transformations: 1. Random shift, scale, rotate with limits: shift_limit=0.05, scale_limit=0.05, rotate_limit=5 2. Perspective transform with scale=(0.01, 0.05)
- class nnspike.data.RegressionDataset(inputs, outputs, roi, train_course, position_variation=0.00625)[source]
Bases:
Dataset
Dataset for regression tasks, particularly for predicting continuous values like steering angles.
This dataset handles image loading, preprocessing, and augmentation for regression tasks. It supports data augmentation through random brightness/contrast adjustments and RGB shifts.
- preprocess
Transform to convert numpy arrays to PyTorch tensors.
- Type:
transforms.ToTensor
- class nnspike.data.ClassificationDataset(inputs, outputs, roi, train_course, position_variation=0.00625, transform=None)[source]
Bases:
Dataset
Dataset for classification tasks, particularly for predicting discrete modes of operation.
This dataset handles image loading, preprocessing, and augmentation for classification tasks. It supports data augmentation through random brightness/contrast adjustments, RGB shifts, and optional custom transforms.
- preprocess
Transform to convert numpy arrays to PyTorch tensors.
- Type:
transforms.ToTensor
- class nnspike.data.MultiTaskDataset(inputs, outputs, roi, train_course, position_variation=0.00625)[source]
Bases:
Dataset
Dataset for multi-task learning, combining both regression and classification tasks.
This dataset handles image loading, preprocessing, and augmentation for both regression and classification tasks simultaneously. It supports data augmentation through random brightness/contrast adjustments and RGB shifts.
- preprocess
Transform to convert numpy arrays to PyTorch tensors.
- Type:
transforms.ToTensor
- nnspike.data.balance_dataset(df, col_name, max_samples, num_bins)[source]
Balances the dataset by limiting the number of samples in each bin of a specified column.
This function creates a histogram of the specified column and ensures that no bin has more than max_samples samples. If a bin exceeds this limit, excess samples are randomly removed to balance the dataset.
- Parameters:
df (pd.DataFrame) – The input DataFrame containing the data to be balanced.
col_name (str) – The name of the column to be used for creating bins.
max_samples (int) – The maximum number of samples allowed per bin.
num_bins (int) – The number of bins to divide the column into.
- Returns:
A DataFrame with the dataset balanced according to the specified column and bin limits.
- Return type:
pd.DataFrame
Note
- Make sure the column does not have
None/Nan
empty string
Otherwise, ValueError: autodetected range of [nan, nan] is not finite may raise
- nnspike.data.sort_by_frames_number(df)[source]
Sorts a DataFrame by the frame number extracted from the ‘image_path’ column.
This function extracts the frame number from the ‘image_path’ column of the DataFrame, sorts the DataFrame based on these frame numbers, and keeps the ‘frame_number’ column as the 2nd column in the DataFrame.
- Parameters:
df (pd.DataFrame) – The input DataFrame containing an ‘image_path’ column with file paths that include frame numbers in the format ‘frame_<number>’.
- Returns:
The sorted DataFrame with rows ordered by the extracted frame numbers and the frame_number column as the 2nd column.
- Return type:
pd.DataFrame
- nnspike.data.create_label_dataframe(path_pattern, course)[source]
Creates a comprehensive DataFrame with image paths and associated metadata for labeling tasks.
This function searches for image files matching the given path pattern and constructs a DataFrame containing the paths to these images along with multiple columns for sensor data and robot control information. The DataFrame includes columns for: - Image metadata: image_path, course, data_type, use flag - Target and mode information: mode, target_x - Motor data: motor_a_speed, motor_b_speed, motor_a_relative_position, motor_b_relative_position - Sensor readings: distance_sensor, color_reflected, color_ambient, color_value
All sensor and motor columns are initialized with default values (0 for numeric fields, NaN for mode and target_x, True for use flag) to be populated later by other functions.
- Parameters:
path_pattern (str) – A glob pattern to match image file paths.
course (str) – The name of the course associated with the images.
- Returns:
A DataFrame containing the image paths and comprehensive metadata columns with default values for subsequent data population.
- Return type:
pd.DataFrame
- nnspike.data.set_spike_status(label_df, status_df)[source]
Merges comprehensive sensor and motor data from status_df into label_df based on matching frame numbers.
This function performs a comprehensive merge of robot sensor and control data from status_df into label_df by matching frame_number values. It updates multiple columns including: - Motor control: motor_a_speed, motor_b_speed, motor_a_relative_position, motor_b_relative_position - Sensor readings: distance_sensor, color_reflected, color_ambient, color_value - Robot mode: mode (preserving manual labels from label_df when available)
The merge is performed as a left join, preserving all rows in label_df and only updating sensor/motor data where matching frame numbers exist in status_df. If label_df doesn’t have a frame_number column, it will be automatically added by calling sort_by_frames_number().
- Parameters:
label_df (pd.DataFrame) – The label DataFrame containing image paths and metadata. Must have or be able to generate a frame_number column.
status_df (pd.DataFrame) – The status DataFrame containing comprehensive sensor and motor data with frame_number column for matching.
- Returns:
The updated label DataFrame with sensor and motor data merged from status_df. All original columns are preserved, and sensor data is filled where frame matches exist.
- Return type:
pd.DataFrame
nnspike.data.aug module
Image augmentation utilities for neural network training data.
This module provides functions for applying various image transformations to augment training datasets. It includes geometric transformations such as shift, scale, rotation, and perspective distortion to improve model robustness and generalization.
The module is designed to work with computer vision datasets, particularly for robot navigation and control tasks. It uses the Albumentations library for efficient and reproducible image transformations.
Examples
Basic usage for single image transformations:
>>> import numpy as np
>>> from nnspike.data.aug import (
... random_shift_scale_rotate,
... perspective_transform,
... )
>>>
>>> # Load an image (example with random data)
>>> image = np.random.rand(224, 224, 3).astype(np.uint8)
>>>
>>> # Apply shift, scale, and rotation
>>> aug_image, params = random_shift_scale_rotate(image)
>>>
>>> # Apply perspective transformation
>>> aug_image = perspective_transform(aug_image)
Dataset augmentation workflow:
>>> import pandas as pd
>>> from nnspike.data.aug import augment_dataset
>>>
>>> # Prepare DataFrame with image metadata
>>> df = pd.DataFrame(
... {
... "image_path": ["/path/to/img1.jpg", "/path/to/img2.jpg"],
... "use": [True, True],
... "frame_number": [1, 2],
... "mode": ["train", "train"],
... # ... other required columns
... }
... )
>>>
>>> # Augment 50% of the usable images
>>> augmented_df = augment_dataset(df, p=0.5, export_path="/path/to/augmented")
- This module contains the following main functions
- - random_shift_scale_rotate
Applies geometric transformations with controllable parameters
- - perspective_transform
Applies perspective distortion to simulate camera angle changes
- - augment_dataset
Batch processes DataFrame of images with augmentation pipeline
Note
All augmentation functions preserve the original image format and dimensions unless specifically configured otherwise. The transformations are designed to maintain realistic image characteristics suitable for robot vision tasks.
- Dependencies:
albumentations: For efficient image transformations
opencv-python (cv2): For image I/O operations
numpy: For array operations
pandas: For dataset metadata management
tqdm: For progress tracking during batch operations
- nnspike.data.aug.random_shift_scale_rotate(image, shift_limit=0.0625, scale_limit=0.1, rotate_limit=15)[source]
Apply a random shift, scale, and rotation transformation to an input image.
- Parameters:
image (np.ndarray) – The input image to be transformed.
shift_limit (float, optional) – Maximum fraction of total height/width to shift the image. Default is 0.0625.
scale_limit (float, optional) – Maximum scaling factor. Default is 0.1.
rotate_limit (int, optional) – Maximum rotation angle in degrees. Default is 15.
- Returns:
- A tuple containing:
transformed_image (np.ndarray): The transformed image.
params (dict): The parameters used for the transformation.
- Return type:
tuple
Example
>>> import numpy as np >>> image = np.random.rand(100, 100, 3) >>> transformed_image, params = random_shift_scale_rotate(image)
- nnspike.data.aug.perspective_transform(image, scale=(0.01, 0.05), keep_size=True)[source]
Apply a perspective transformation to an input image.
- Parameters:
image (np.ndarray) – The input image to be transformed.
scale (tuple, optional) – Range for perspective distortion scale. Default is (0.01, 0.05).
keep_size (bool, optional) – Whether to keep the original image size. Default is True.
- Returns:
The transformed image.
- Return type:
np.ndarray
Example
>>> import numpy as np >>> image = np.random.rand(100, 100, 3) >>> transformed_image = perspective_transform(image)
- nnspike.data.aug.augment_dataset(df, p, export_path)[source]
Augment images in a dataset based on specified conditions and save them to a new location.
This function filters a DataFrame to include only rows where ‘use’ is True, then randomly selects rows based on probability p. For each selected row, it loads the image, applies sequential transformations (shift-scale-rotate followed by perspective transform), and saves the augmented image to the export directory. The function returns a new DataFrame containing metadata for all augmented images.
- Parameters:
df (pd.DataFrame) – DataFrame containing image paths and metadata. Must include columns: ‘image_path’, ‘use’, ‘frame_number’, ‘mode’, ‘course’, ‘motor_a_relative_position’, ‘motor_b_relative_position’, ‘data_type’.
p (float) – Probability threshold (0.0-1.0) for applying augmentation to each row where ‘use’ is True. Higher values result in more augmented images.
export_path (str) – Directory path where augmented images will be saved. Must not already exist - the function will create it.
- Returns:
- DataFrame containing metadata of augmented images with the same
structure as the input DataFrame. The ‘image_path’ column will contain paths to the newly created augmented images, and ‘target_x’, ‘left_x’, ‘right_x’ will be set to None.
- Return type:
pd.DataFrame
- Raises:
FileExistsError – If the export_path directory already exists.
Example
>>> import pandas as pd >>> df = pd.DataFrame( ... { ... "image_path": ["/path/to/image1.jpg", "/path/to/image2.jpg"], ... "use": [True, True], ... "frame_number": [1, 2], ... "mode": ["A", "B"], ... "course": ["course1", "course1"], ... "motor_a_relative_position": [0, 10], ... "motor_b_relative_position": [0, -5], ... "data_type": ["train", "train"], ... } ... ) >>> augmented_df = augment_dataset(df, p=0.5, export_path="/path/to/augmented")
Note
The function applies two sequential transformations: 1. Random shift, scale, rotate with limits: shift_limit=0.05, scale_limit=0.05, rotate_limit=5 2. Perspective transform with scale=(0.01, 0.05)
nnspike.data.dataset module
This module defines custom PyTorch Datasets for driving records, including image preprocessing and augmentation.
This module provides dataset classes for different machine learning tasks including regression, classification, and multi-task learning. All datasets support data augmentation through brightness/contrast adjustments, RGB shifts, and horizontal flipping based on course direction.
- Modules:
cv2: OpenCV library for image processing.
torch: PyTorch library for tensor operations and neural networks.
albumentations as A: Albumentations library for image augmentations.
torchvision.transforms as transforms: PyTorch’s torchvision library for common image transformations.
numpy as np: NumPy library for numerical operations.
torch.utils.data.Dataset: Base class for all datasets in PyTorch.
nnspike.utils.normalize_image: Custom function for image normalization.
nnspike.constants.Mode: Enumeration for different operation modes.
- Constants:
transform_bright_shift (albumentations.ReplayCompose): Augmentation pipeline with random brightness/contrast adjustments and RGB shifts.
transform_flip (albumentations.Compose): Augmentation pipeline for horizontal flipping of images.
- Functions:
_rand_relative_position(relative_position, position_variation): Adds random variation to relative position for data augmentation.
- Classes:
RegressionDataset(Dataset): Custom dataset class for regression tasks, predicting continuous values like steering angles.
ClassificationDataset(Dataset): Custom dataset class for classification tasks, predicting discrete modes of operation.
MultiTaskDataset(Dataset): Custom dataset class for multi-task learning, combining both regression and classification.
- RegressionDataset Class:
Designed for predicting continuous values from images and relative position data.
- Methods:
- __init__(self, inputs, outputs, roi, train_course, position_variation=0.00625):
Initializes the dataset with input data, target values, region of interest, training course, and position variation.
- __len__(self):
Returns the number of samples in the dataset.
- __getitem__(self, idx):
Retrieves and processes the sample at the given index. Returns tuple of (roi_tensor, relative_position) and target_x.
- ClassificationDataset Class:
Designed for predicting discrete modes/classes from images and relative position data.
- Methods:
- __init__(self, inputs, outputs, roi, train_course, position_variation=0.00625, transform=None):
Initializes the dataset with input data, class labels, region of interest, training course, position variation, and optional transforms.
- __len__(self):
Returns the number of samples in the dataset.
- __getitem__(self, idx):
Retrieves and processes the sample at the given index. Returns tuple of (roi_tensor, relative_position) and mode.
- MultiTaskDataset Class:
Designed for simultaneous regression and classification tasks from the same input data.
- Methods:
- __init__(self, inputs, outputs, roi, train_course, position_variation=0.00625):
Initializes the dataset with input data, combined outputs (mode, target), region of interest, training course, and position variation.
- __len__(self):
Returns the number of samples in the dataset.
- __getitem__(self, idx):
Retrieves and processes the sample at the given index. Returns tuple of (roi_tensor, relative_position) and (mode, target_x).
- Usage Examples:
# Regression dataset for predicting steering angles regression_dataset = RegressionDataset(
inputs=[(‘path/to/image.png’, 0.5, ‘course1’)], outputs=[100.0], roi=np.array([0, 0, 200, 200]), train_course=’course1’
)
# Classification dataset for predicting operation modes classification_dataset = ClassificationDataset(
inputs=[(‘path/to/image.png’, 0.5, ‘course1’)], outputs=[1], roi=np.array([0, 0, 200, 200]), train_course=’course1’
)
# Multi-task dataset for both regression and classification multitask_dataset = MultiTaskDataset(
inputs=[(‘path/to/image.png’, 0.5, ‘course1’)], outputs=[(1, 100.0)], roi=np.array([0, 0, 200, 200]), train_course=’course1’
)
# Using with DataLoader dataloader = torch.utils.data.DataLoader(regression_dataset, batch_size=4, shuffle=True) for (roi_area, relative_position), target in dataloader:
# Training loop here pass
Notes
All datasets apply data augmentation including brightness/contrast adjustments and RGB shifts.
Images are horizontally flipped when the course differs from the training course.
Relative positions are augmented with random variation for improved generalization.
Target values are normalized to [0, 1] range for regression tasks.
The normalize_image function is used for image preprocessing.
ROI (Region of Interest) defines the area of the image to extract for processing.
- class nnspike.data.dataset.RegressionDataset(inputs, outputs, roi, train_course, position_variation=0.00625)[source]
Bases:
Dataset
Dataset for regression tasks, particularly for predicting continuous values like steering angles.
This dataset handles image loading, preprocessing, and augmentation for regression tasks. It supports data augmentation through random brightness/contrast adjustments and RGB shifts.
- preprocess
Transform to convert numpy arrays to PyTorch tensors.
- Type:
transforms.ToTensor
- class nnspike.data.dataset.ClassificationDataset(inputs, outputs, roi, train_course, position_variation=0.00625, transform=None)[source]
Bases:
Dataset
Dataset for classification tasks, particularly for predicting discrete modes of operation.
This dataset handles image loading, preprocessing, and augmentation for classification tasks. It supports data augmentation through random brightness/contrast adjustments, RGB shifts, and optional custom transforms.
- preprocess
Transform to convert numpy arrays to PyTorch tensors.
- Type:
transforms.ToTensor
- class nnspike.data.dataset.MultiTaskDataset(inputs, outputs, roi, train_course, position_variation=0.00625)[source]
Bases:
Dataset
Dataset for multi-task learning, combining both regression and classification tasks.
This dataset handles image loading, preprocessing, and augmentation for both regression and classification tasks simultaneously. It supports data augmentation through random brightness/contrast adjustments and RGB shifts.
- preprocess
Transform to convert numpy arrays to PyTorch tensors.
- Type:
transforms.ToTensor
nnspike.data.preprocess module
This module provides functions for preprocessing and preparing datasets of image frames with sensor data.
The module contains utilities for creating labeled datasets from image files, balancing data distributions, sorting by frame numbers, and merging sensor status data with image metadata. It’s designed to work with robot control data that includes motor positions, sensor readings, and image frames.
- Functions:
- balance_dataset(df: pd.DataFrame, col_name: str, max_samples: int, num_bins: int) -> pd.DataFrame:
Balances the dataset by limiting the number of samples in each bin of a specified column. Uses histogram binning to ensure uniform distribution across value ranges.
- sort_by_frames_number(df: pd.DataFrame) -> pd.DataFrame:
Sorts a DataFrame by the frame number extracted from the ‘image_path’ column and adds the frame_number column as the second column in the DataFrame.
- create_label_dataframe(path_pattern: str, course: str) -> pd.DataFrame:
Creates a comprehensive DataFrame with image paths and associated metadata columns including motor speeds, positions, sensor readings, and labeling fields. Initializes all sensor columns with default values for subsequent population.
- set_spike_status(label_df: pd.DataFrame, status_df: pd.DataFrame) -> pd.DataFrame:
Merges comprehensive sensor and motor data from status_df into label_df based on matching frame numbers. Updates multiple columns including motor speeds, positions, and various sensor readings (distance, color sensors).
- nnspike.data.preprocess.balance_dataset(df, col_name, max_samples, num_bins)[source]
Balances the dataset by limiting the number of samples in each bin of a specified column.
This function creates a histogram of the specified column and ensures that no bin has more than max_samples samples. If a bin exceeds this limit, excess samples are randomly removed to balance the dataset.
- Parameters:
df (pd.DataFrame) – The input DataFrame containing the data to be balanced.
col_name (str) – The name of the column to be used for creating bins.
max_samples (int) – The maximum number of samples allowed per bin.
num_bins (int) – The number of bins to divide the column into.
- Returns:
A DataFrame with the dataset balanced according to the specified column and bin limits.
- Return type:
pd.DataFrame
Note
- Make sure the column does not have
None/Nan
empty string
Otherwise, ValueError: autodetected range of [nan, nan] is not finite may raise
- nnspike.data.preprocess.sort_by_frames_number(df)[source]
Sorts a DataFrame by the frame number extracted from the ‘image_path’ column.
This function extracts the frame number from the ‘image_path’ column of the DataFrame, sorts the DataFrame based on these frame numbers, and keeps the ‘frame_number’ column as the 2nd column in the DataFrame.
- Parameters:
df (pd.DataFrame) – The input DataFrame containing an ‘image_path’ column with file paths that include frame numbers in the format ‘frame_<number>’.
- Returns:
The sorted DataFrame with rows ordered by the extracted frame numbers and the frame_number column as the 2nd column.
- Return type:
pd.DataFrame
- nnspike.data.preprocess.create_label_dataframe(path_pattern, course)[source]
Creates a comprehensive DataFrame with image paths and associated metadata for labeling tasks.
This function searches for image files matching the given path pattern and constructs a DataFrame containing the paths to these images along with multiple columns for sensor data and robot control information. The DataFrame includes columns for: - Image metadata: image_path, course, data_type, use flag - Target and mode information: mode, target_x - Motor data: motor_a_speed, motor_b_speed, motor_a_relative_position, motor_b_relative_position - Sensor readings: distance_sensor, color_reflected, color_ambient, color_value
All sensor and motor columns are initialized with default values (0 for numeric fields, NaN for mode and target_x, True for use flag) to be populated later by other functions.
- Parameters:
path_pattern (str) – A glob pattern to match image file paths.
course (str) – The name of the course associated with the images.
- Returns:
A DataFrame containing the image paths and comprehensive metadata columns with default values for subsequent data population.
- Return type:
pd.DataFrame
- nnspike.data.preprocess.set_spike_status(label_df, status_df)[source]
Merges comprehensive sensor and motor data from status_df into label_df based on matching frame numbers.
This function performs a comprehensive merge of robot sensor and control data from status_df into label_df by matching frame_number values. It updates multiple columns including: - Motor control: motor_a_speed, motor_b_speed, motor_a_relative_position, motor_b_relative_position - Sensor readings: distance_sensor, color_reflected, color_ambient, color_value - Robot mode: mode (preserving manual labels from label_df when available)
The merge is performed as a left join, preserving all rows in label_df and only updating sensor/motor data where matching frame numbers exist in status_df. If label_df doesn’t have a frame_number column, it will be automatically added by calling sort_by_frames_number().
- Parameters:
label_df (pd.DataFrame) – The label DataFrame containing image paths and metadata. Must have or be able to generate a frame_number column.
status_df (pd.DataFrame) – The status DataFrame containing comprehensive sensor and motor data with frame_number column for matching.
- Returns:
The updated label DataFrame with sensor and motor data merged from status_df. All original columns are preserved, and sensor data is filled where frame matches exist.
- Return type:
pd.DataFrame
nnspike.models package
Neural network models for vision-based robot navigation.
This module contains neural network architectures designed for the LEGO SPIKE robot’s line following and navigation tasks. It includes both custom and adapted models for different learning approaches.
- Modules:
customized: Custom neural network architectures for specific tasks loss: Custom loss functions for multi-task learning nvidia: Adapted NVIDIA models for regression and multi-task learning
Example
Creating and using a regression model:
from nnspike.models import NvidiaModelRegression
# Initialize model model = NvidiaModelRegression()
# Use model for inference prediction = model(input_tensor)
- class nnspike.models.SimpleNetClassification25(num_classes)[source]
Bases:
Module
A simple convolutional neural network for processing image data with additional sensor inputs.
This network consists of two convolutional layers followed by max pooling, and two fully connected layers. It accepts both image data and relative position information as inputs. The architecture is designed to handle input images and concatenate them with additional sensor data before final classification.
- Architecture:
Conv2d (3->8 channels, 5x5 kernel, stride=2) + ReLU + MaxPool2d (2x2)
Conv2d (8->16 channels, 5x5 kernel, stride=1, padding=2) + ReLU + MaxPool2d (2x2)
Flatten + Concatenate with relative position
Linear (2689->64) + ReLU
Linear (64->1) output
Expected input image size: (61, 197) which gets processed to (16, 7, 24) after convolutions.
- __init__(num_classes)[source]
Initialize the SimpleNetClassification25 model.
Sets up all layers including convolutional layers, pooling, and fully connected layers. The input size calculation assumes input images of size (61, 197).
- forward(x, relative_position)[source]
Forward pass through the network.
- Parameters:
x (torch.Tensor) – Input image tensor of shape (batch_size, 3, height, width). Expected input size is (batch_size, 3, 61, 197).
relative_position (torch.Tensor) – Relative position sensor data of shape (batch_size,) or (batch_size, 1). This additional sensor input is concatenated with the flattened convolutional features.
- Returns:
- Output logits of shape (batch_size, 1). These are raw
output values that can be used for regression or passed through a sigmoid for binary classification.
- Return type:
torch.Tensor
Note
The network expects input images of size (61, 197). After the first conv+pool operation, the spatial dimensions become approximately (32, 32), and after the second conv+pool operation, they become (16, 16). The comment dimensions may not be accurate for all input sizes.
- class nnspike.models.NvidiaModelMultiTask(num_modes)[source]
Bases:
Module
A neural network model based on the NVIDIA architecture for end-to-end learning of self-driving cars.
This model consists of five convolutional layers followed by four fully connected layers. The ELU activation function is used after each layer except the final output layer. Additionally, an interval input is concatenated with the flattened output from the convolutional layers before being passed through the fully connected layers.
- conv1
First convolutional layer with 3 input channels and 24 output channels.
- Type:
nn.Conv2d
- conv2
Second convolutional layer with 24 input channels and 36 output channels.
- Type:
nn.Conv2d
- conv3
Third convolutional layer with 36 input channels and 48 output channels.
- Type:
nn.Conv2d
- conv4
Fourth convolutional layer with 48 input channels and 64 output channels.
- Type:
nn.Conv2d
- conv5
Fifth convolutional layer with 64 input channels and 64 output channels.
- Type:
nn.Conv2d
- flatten
Layer to flatten the output from the convolutional layers.
- Type:
nn.Flatten
- fc1
First fully connected layer with input size adjusted to include sensor inputs.
- Type:
nn.Linear
- fc2
Second fully connected layer.
- Type:
nn.Linear
- fc3
Third fully connected layer.
- Type:
nn.Linear
- mode_classifier
Output layer for behavior mode classification (4 modes).
- Type:
nn.Linear
- self_driving_head
Output layer for self-driving control.
- Type:
nn.Linear
- elu
Exponential Linear Unit activation function applied after each layer except the final output layer.
- Type:
nn.ELU
- softmax
Softmax activation for mode classification.
- Type:
nn.Softmax
- forward(x, left_x, right_x, relative_position)[source]
Defines the forward pass of the model. Takes an image tensor x and additional sensor inputs, processes them through the network, and returns two output tensors: mode classification and control.
- Parameters:
x (torch.Tensor) – Input image tensor of shape (batch_size, 3, height, width).
left_x (torch.Tensor) – Left sensor input tensor of shape (batch_size, 1).
right_x (torch.Tensor) – Right sensor input tensor of shape (batch_size, 1).
relative_position (torch.Tensor) – Relative position tensor of shape (batch_size, 1).
- Returns:
mode_output: Softmax probabilities for robot behavior modes (batch_size, 4) [left_x following, right_x following, obstacle avoidance, self driving]
control_output: Control tensor for self-driving mode (batch_size, 1)
- Return type:
tuple[torch.Tensor, torch.Tensor]
- class nnspike.models.NvidiaModelRegression[source]
Bases:
Module
A neural network model based on the NVIDIA architecture for end-to-end learning of self-driving cars.
This model consists of five convolutional layers followed by four fully connected layers. The ELU activation function is used after each layer except the final output layer. Additionally, an interval input is concatenated with the flattened output from the convolutional layers before being passed through the fully connected layers.
- conv1
First convolutional layer with 3 input channels and 24 output channels.
- Type:
nn.Conv2d
- conv2
Second convolutional layer with 24 input channels and 36 output channels.
- Type:
nn.Conv2d
- conv3
Third convolutional layer with 36 input channels and 48 output channels.
- Type:
nn.Conv2d
- conv4
Fourth convolutional layer with 48 input channels and 64 output channels.
- Type:
nn.Conv2d
- conv5
Fifth convolutional layer with 64 input channels and 64 output channels.
- Type:
nn.Conv2d
- flatten
Layer to flatten the output from the convolutional layers.
- Type:
nn.Flatten
- fc1
First fully connected layer with input size adjusted to include sensor inputs.
- Type:
nn.Linear
- fc2
Second fully connected layer.
- Type:
nn.Linear
- fc3
Third fully connected layer.
- Type:
nn.Linear
- mode_classifier
Output layer for behavior mode classification (4 modes).
- Type:
nn.Linear
- self_driving_head
Output layer for self-driving control.
- Type:
nn.Linear
- elu
Exponential Linear Unit activation function applied after each layer except the final output layer.
- Type:
nn.ELU
- softmax
Softmax activation for mode classification.
- Type:
nn.Softmax
- forward(x, left_x, right_x, relative_position)[source]
Defines the forward pass of the model. Takes an image tensor x and additional sensor inputs, processes them through the network, and returns two output tensors: mode classification and control.
- Parameters:
x (torch.Tensor) – Input image tensor of shape (batch_size, 3, height, width).
left_x (torch.Tensor) – Left sensor input tensor of shape (batch_size, 1).
right_x (torch.Tensor) – Right sensor input tensor of shape (batch_size, 1).
relative_position (torch.Tensor) – Relative position tensor of shape (batch_size, 1).
- Returns:
mode_output: Softmax probabilities for robot behavior modes (batch_size, 4) [left_x following, right_x following, obstacle avoidance, self driving]
control_output: Control tensor for self-driving mode (batch_size, 1)
- Return type:
tuple[torch.Tensor, torch.Tensor]
- class nnspike.models.MultiTaskLoss(mode_weight=1.0, control_weight=30.0, control_scale=10.0)[source]
Bases:
Module
nnspike.models.customized module
- class nnspike.models.customized.SimpleNetClassification25(num_classes)[source]
Bases:
Module
A simple convolutional neural network for processing image data with additional sensor inputs.
This network consists of two convolutional layers followed by max pooling, and two fully connected layers. It accepts both image data and relative position information as inputs. The architecture is designed to handle input images and concatenate them with additional sensor data before final classification.
- Architecture:
Conv2d (3->8 channels, 5x5 kernel, stride=2) + ReLU + MaxPool2d (2x2)
Conv2d (8->16 channels, 5x5 kernel, stride=1, padding=2) + ReLU + MaxPool2d (2x2)
Flatten + Concatenate with relative position
Linear (2689->64) + ReLU
Linear (64->1) output
Expected input image size: (61, 197) which gets processed to (16, 7, 24) after convolutions.
- __init__(num_classes)[source]
Initialize the SimpleNetClassification25 model.
Sets up all layers including convolutional layers, pooling, and fully connected layers. The input size calculation assumes input images of size (61, 197).
- forward(x, relative_position)[source]
Forward pass through the network.
- Parameters:
x (torch.Tensor) – Input image tensor of shape (batch_size, 3, height, width). Expected input size is (batch_size, 3, 61, 197).
relative_position (torch.Tensor) – Relative position sensor data of shape (batch_size,) or (batch_size, 1). This additional sensor input is concatenated with the flattened convolutional features.
- Returns:
- Output logits of shape (batch_size, 1). These are raw
output values that can be used for regression or passed through a sigmoid for binary classification.
- Return type:
torch.Tensor
Note
The network expects input images of size (61, 197). After the first conv+pool operation, the spatial dimensions become approximately (32, 32), and after the second conv+pool operation, they become (16, 16). The comment dimensions may not be accurate for all input sizes.
nnspike.models.loss module
nnspike.models.nvidia module
- class nnspike.models.nvidia.NvidiaModelRegression[source]
Bases:
Module
A neural network model based on the NVIDIA architecture for end-to-end learning of self-driving cars.
This model consists of five convolutional layers followed by four fully connected layers. The ELU activation function is used after each layer except the final output layer. Additionally, an interval input is concatenated with the flattened output from the convolutional layers before being passed through the fully connected layers.
- conv1
First convolutional layer with 3 input channels and 24 output channels.
- Type:
nn.Conv2d
- conv2
Second convolutional layer with 24 input channels and 36 output channels.
- Type:
nn.Conv2d
- conv3
Third convolutional layer with 36 input channels and 48 output channels.
- Type:
nn.Conv2d
- conv4
Fourth convolutional layer with 48 input channels and 64 output channels.
- Type:
nn.Conv2d
- conv5
Fifth convolutional layer with 64 input channels and 64 output channels.
- Type:
nn.Conv2d
- flatten
Layer to flatten the output from the convolutional layers.
- Type:
nn.Flatten
- fc1
First fully connected layer with input size adjusted to include sensor inputs.
- Type:
nn.Linear
- fc2
Second fully connected layer.
- Type:
nn.Linear
- fc3
Third fully connected layer.
- Type:
nn.Linear
- mode_classifier
Output layer for behavior mode classification (4 modes).
- Type:
nn.Linear
- self_driving_head
Output layer for self-driving control.
- Type:
nn.Linear
- elu
Exponential Linear Unit activation function applied after each layer except the final output layer.
- Type:
nn.ELU
- softmax
Softmax activation for mode classification.
- Type:
nn.Softmax
- forward(x, left_x, right_x, relative_position)[source]
Defines the forward pass of the model. Takes an image tensor x and additional sensor inputs, processes them through the network, and returns two output tensors: mode classification and control.
- Parameters:
x (torch.Tensor) – Input image tensor of shape (batch_size, 3, height, width).
left_x (torch.Tensor) – Left sensor input tensor of shape (batch_size, 1).
right_x (torch.Tensor) – Right sensor input tensor of shape (batch_size, 1).
relative_position (torch.Tensor) – Relative position tensor of shape (batch_size, 1).
- Returns:
mode_output: Softmax probabilities for robot behavior modes (batch_size, 4) [left_x following, right_x following, obstacle avoidance, self driving]
control_output: Control tensor for self-driving mode (batch_size, 1)
- Return type:
tuple[torch.Tensor, torch.Tensor]
- class nnspike.models.nvidia.NvidiaModelMultiTask(num_modes)[source]
Bases:
Module
A neural network model based on the NVIDIA architecture for end-to-end learning of self-driving cars.
This model consists of five convolutional layers followed by four fully connected layers. The ELU activation function is used after each layer except the final output layer. Additionally, an interval input is concatenated with the flattened output from the convolutional layers before being passed through the fully connected layers.
- conv1
First convolutional layer with 3 input channels and 24 output channels.
- Type:
nn.Conv2d
- conv2
Second convolutional layer with 24 input channels and 36 output channels.
- Type:
nn.Conv2d
- conv3
Third convolutional layer with 36 input channels and 48 output channels.
- Type:
nn.Conv2d
- conv4
Fourth convolutional layer with 48 input channels and 64 output channels.
- Type:
nn.Conv2d
- conv5
Fifth convolutional layer with 64 input channels and 64 output channels.
- Type:
nn.Conv2d
- flatten
Layer to flatten the output from the convolutional layers.
- Type:
nn.Flatten
- fc1
First fully connected layer with input size adjusted to include sensor inputs.
- Type:
nn.Linear
- fc2
Second fully connected layer.
- Type:
nn.Linear
- fc3
Third fully connected layer.
- Type:
nn.Linear
- mode_classifier
Output layer for behavior mode classification (4 modes).
- Type:
nn.Linear
- self_driving_head
Output layer for self-driving control.
- Type:
nn.Linear
- elu
Exponential Linear Unit activation function applied after each layer except the final output layer.
- Type:
nn.ELU
- softmax
Softmax activation for mode classification.
- Type:
nn.Softmax
- forward(x, left_x, right_x, relative_position)[source]
Defines the forward pass of the model. Takes an image tensor x and additional sensor inputs, processes them through the network, and returns two output tensors: mode classification and control.
- Parameters:
x (torch.Tensor) – Input image tensor of shape (batch_size, 3, height, width).
left_x (torch.Tensor) – Left sensor input tensor of shape (batch_size, 1).
right_x (torch.Tensor) – Right sensor input tensor of shape (batch_size, 1).
relative_position (torch.Tensor) – Relative position tensor of shape (batch_size, 1).
- Returns:
mode_output: Softmax probabilities for robot behavior modes (batch_size, 4) [left_x following, right_x following, obstacle avoidance, self driving]
control_output: Control tensor for self-driving mode (batch_size, 1)
- Return type:
tuple[torch.Tensor, torch.Tensor]
nnspike.unit package
Hardware control and management units for LEGO SPIKE robot.
This module provides classes and utilities for controlling and managing the LEGO SPIKE robot hardware, including communication, mode management, and real-time data streaming.
- Modules:
action_chain: Sequential action execution for robot behaviors etrobot: Main robot control interface for LEGO SPIKE hardware mode_manager: Robot operation mode management and switching spike_status: Robot status monitoring and reporting webcam_video_stream: Real-time video streaming and processing
Example
Basic robot initialization and control:
from nnspike.unit import ETRobot, ModeManager
# Initialize robot robot = ETRobot() mode_manager = ModeManager(robot)
# Start line following mode mode_manager.set_mode(“line_follow”)
- class nnspike.unit.ActionChain(et, course)[source]
Bases:
object
A class to manage a sequence of actions for an ETRobot.
This class allows you to define a chain of actions, each consisting of setting left and right motor speeds for a specified duration.
- carry_bottle_phase1(image, predicted_x, init_flag)[source]
- Return type:
tuple
[float
|None
,tuple
[int
,int
] |None
,Mode
]
- carry_bottle_phase10(image, predicted_x, init_flag)[source]
- Return type:
tuple
[float
|None
,tuple
[int
,int
] |None
,Mode
]
- carry_bottle_phase11(image, predicted_x, init_flag)[source]
- Return type:
tuple
[float
|None
,tuple
[int
,int
] |None
,Mode
]
- carry_bottle_phase12(image, predicted_x, init_flag)[source]
- Return type:
tuple
[float
|None
,tuple
[int
,int
] |None
,Mode
]
- carry_bottle_phase13(image, predicted_x, init_flag)[source]
- Return type:
tuple
[float
|None
,tuple
[int
,int
] |None
,Mode
]
- carry_bottle_phase14(image, predicted_x, init_flag)[source]
- Return type:
tuple
[float
|None
,tuple
[int
,int
] |None
,Mode
]
- carry_bottle_phase15(image, predicted_x, init_flag)[source]
- Return type:
tuple
[float
|None
,tuple
[int
,int
] |None
,Mode
]
- carry_bottle_phase16(image, predicted_x, init_flag)[source]
- Return type:
tuple
[float
|None
,tuple
[int
,int
] |None
,Mode
]
- carry_bottle_phase17(image, predicted_x, init_flag)[source]
- Return type:
tuple
[float
|None
,tuple
[int
,int
] |None
,Mode
]
- carry_bottle_phase18(image, predicted_x, init_flag)[source]
- Return type:
tuple
[float
|None
,tuple
[int
,int
] |None
,Mode
]
- carry_bottle_phase19(image, predicted_x, init_flag)[source]
- Return type:
tuple
[float
|None
,tuple
[int
,int
] |None
,Mode
]
- carry_bottle_phase2(image, predicted_x, init_flag)[source]
- Return type:
tuple
[float
|None
,tuple
[int
,int
] |None
,Mode
]
- carry_bottle_phase3(image, predicted_x, init_flag)[source]
- Return type:
tuple
[float
|None
,tuple
[int
,int
] |None
,Mode
]
- carry_bottle_phase4(image, predicted_x, init_flag)[source]
- Return type:
tuple
[float
|None
,tuple
[int
,int
] |None
,Mode
]
- carry_bottle_phase5(image, predicted_x, init_flag)[source]
- Return type:
tuple
[float
|None
,tuple
[int
,int
] |None
,Mode
]
- carry_bottle_phase6(image, predicted_x, init_flag)[source]
- Return type:
tuple
[float
|None
,tuple
[int
,int
] |None
,Mode
]
- carry_bottle_phase7(image, predicted_x, init_flag)[source]
- Return type:
tuple
[float
|None
,tuple
[int
,int
] |None
,Mode
]
- carry_bottle_phase8(image, predicted_x, init_flag)[source]
- Return type:
tuple
[float
|None
,tuple
[int
,int
] |None
,Mode
]
- carry_bottle_phase9(image, predicted_x, init_flag)[source]
- Return type:
tuple
[float
|None
,tuple
[int
,int
] |None
,Mode
]
- follow_left_edge(image, predicted_x)[source]
- Return type:
tuple
[float
|None
,tuple
[int
,int
] |None
,Mode
]
- class nnspike.unit.ETRobot(port='/dev/ttyACM0')[source]
Bases:
object
Interface for communicating with LEGO SPIKE Prime robot over serial connection.
This class provides methods to send commands to and receive sensor data from a LEGO SPIKE Prime robot. It maintains a continuous background thread to receive sensor updates and handles motor commands.
Command IDs should be as same as (spike/slot_prod.py) the script in LEGO Spike Prime.
- COMMAND_SET_MOTOR_FORWARD_SPEED_ID
Command ID for setting forward motor speed.
- Type:
int
- COMMAND_SET_MOTOR_BACKWARD_SPEED_ID
Command ID for setting backward motor speed.
- Type:
int
- COMMAND_SET_MOTOR_RELATIVE_POSITION_ID
Command ID for relative position movement.
- Type:
int
- COMMAND_STOP_MOTOR_ID
Command ID for stopping motors.
- Type:
int
- COMMAND_MOVE_ARM_ID
Command ID for moving robot arm.
- Type:
int
- CMD_FLAG
Command flag prefix for serial communication.
- Type:
bytes
- DUMMY
Dummy value for command parameters.
- Type:
int
- CMD_FLAG = b'CF:'
- COMMAND_MOVE_ARM_ID = 205
- COMMAND_SET_MOTOR_BACKWARD_SPEED_ID = 202
- COMMAND_SET_MOTOR_FORWARD_SPEED_ID = 201
- COMMAND_SET_MOTOR_RELATIVE_POSITION_ID = 203
- COMMAND_STOP_MOTOR_ID = 204
- DUMMY = 1
- __init__(port='/dev/ttyACM0')[source]
Initialize the ETRobot with a serial connection.
- Parameters:
port (str, optional) – The serial port to connect to. Defaults to “/dev/ttyACM0”.
- get_spike_status()[source]
Get the spike status with last known good sensor values.
- Returns:
Spike status object with consistent sensor data
- Return type:
- move_arm(action, duration=0.5)[source]
Move the arm up or down.
- Parameters:
action (int) – Action to perform (0 = move up, 1 = move down, 2 = stop arm).
- Return type:
None
- receive()[source]
Update ETRobot motor and sensor status by the received data from the GPIO port.
Note
The update rate should be less than the rate of sending sensor data in LEGO Prime Hub (0.0005 seconds).
- Return type:
None
- retrieve_motors_relative_position()[source]
Retrieve the relative positions of the motors.
- Returns:
The sum of the absolute values of the relative positions of both motors.
- Return type:
int
- class nnspike.unit.ModeManager(course)[source]
Bases:
object
Manages the current mode of the robot unit based on image analysis and sensor data.
This class is responsible for determining and transitioning between different behavior modes including edge following, obstacle avoidance, and various bottle carrying phases. Mode transitions are determined based on motor position thresholds, image processing results (color detection), and sensor readings.
The class uses toggle flags to prevent repeated transitions and ensures each mode change occurs only once when conditions are met.
- class nnspike.unit.WebcamVideoStream(src, save_video, save_path='', resolution=(640, 320), fps=30)[source]
Bases:
object
A threaded video stream reader for webcams with optional video recording.
This class provides a non-blocking way to read frames from a video source by running the capture loop in a separate thread. It supports configurable resolution, frame rate, and optional video recording to file.
The threaded approach helps prevent frame drops and provides smoother video processing by maintaining a minimal buffer size and continuous frame updates in the background.
- Parameters:
src (
int
|str
) – Video source - can be camera index (int) or video file path (str)save_video (
bool
) – Whether to save captured video to filesave_path (
str
) – Path for saving video file (required if save_video is True)resolution (
tuple
) – Video resolution as (width, height) tuple. Default: (640, 320)fps (
int
) – Frames per second for capture and recording. Default: 30
Example
>>> stream = WebcamVideoStream(src=0, save_video=False) >>> stream.start() >>> grabbed, frame = stream.read() >>> stream.stop()
- read()[source]
Read the most recently captured frame.
If video recording is enabled, also writes the frame to the output file.
- Return type:
tuple[bool, np.ndarray | None]
- Returns:
Tuple of (success_flag, frame) where success_flag indicates if the frame was successfully captured and frame is the image data as a numpy array, or None if capture failed.
- start()[source]
Start the background thread for reading video frames.
- Return type:
- Returns:
Self reference for method chaining.
- class nnspike.unit.SpikeStatus(raw_data=None)[source]
Bases:
object
Class to represent and access the status of a Lego Spike Prime hub.
This class provides a structured way to access the data received from the Spike Prime, including sensor readings, motor positions, and battery status.
nnspike.unit.action_chain module
- class nnspike.unit.action_chain.ActionChain(et, course)[source]
Bases:
object
A class to manage a sequence of actions for an ETRobot.
This class allows you to define a chain of actions, each consisting of setting left and right motor speeds for a specified duration.
- follow_left_edge(image, predicted_x)[source]
- Return type:
tuple
[float
|None
,tuple
[int
,int
] |None
,Mode
]
- follow_right_edge(image, predicted_x)[source]
- Return type:
tuple
[float
|None
,tuple
[int
,int
] |None
,Mode
]
- carry_bottle_phase1(image, predicted_x, init_flag)[source]
- Return type:
tuple
[float
|None
,tuple
[int
,int
] |None
,Mode
]
- carry_bottle_phase2(image, predicted_x, init_flag)[source]
- Return type:
tuple
[float
|None
,tuple
[int
,int
] |None
,Mode
]
- carry_bottle_phase3(image, predicted_x, init_flag)[source]
- Return type:
tuple
[float
|None
,tuple
[int
,int
] |None
,Mode
]
- carry_bottle_phase4(image, predicted_x, init_flag)[source]
- Return type:
tuple
[float
|None
,tuple
[int
,int
] |None
,Mode
]
- carry_bottle_phase5(image, predicted_x, init_flag)[source]
- Return type:
tuple
[float
|None
,tuple
[int
,int
] |None
,Mode
]
- carry_bottle_phase6(image, predicted_x, init_flag)[source]
- Return type:
tuple
[float
|None
,tuple
[int
,int
] |None
,Mode
]
- carry_bottle_phase7(image, predicted_x, init_flag)[source]
- Return type:
tuple
[float
|None
,tuple
[int
,int
] |None
,Mode
]
- carry_bottle_phase8(image, predicted_x, init_flag)[source]
- Return type:
tuple
[float
|None
,tuple
[int
,int
] |None
,Mode
]
- carry_bottle_phase9(image, predicted_x, init_flag)[source]
- Return type:
tuple
[float
|None
,tuple
[int
,int
] |None
,Mode
]
- carry_bottle_phase10(image, predicted_x, init_flag)[source]
- Return type:
tuple
[float
|None
,tuple
[int
,int
] |None
,Mode
]
- carry_bottle_phase11(image, predicted_x, init_flag)[source]
- Return type:
tuple
[float
|None
,tuple
[int
,int
] |None
,Mode
]
- carry_bottle_phase12(image, predicted_x, init_flag)[source]
- Return type:
tuple
[float
|None
,tuple
[int
,int
] |None
,Mode
]
- carry_bottle_phase13(image, predicted_x, init_flag)[source]
- Return type:
tuple
[float
|None
,tuple
[int
,int
] |None
,Mode
]
- carry_bottle_phase14(image, predicted_x, init_flag)[source]
- Return type:
tuple
[float
|None
,tuple
[int
,int
] |None
,Mode
]
- carry_bottle_phase15(image, predicted_x, init_flag)[source]
- Return type:
tuple
[float
|None
,tuple
[int
,int
] |None
,Mode
]
- carry_bottle_phase16(image, predicted_x, init_flag)[source]
- Return type:
tuple
[float
|None
,tuple
[int
,int
] |None
,Mode
]
- carry_bottle_phase17(image, predicted_x, init_flag)[source]
- Return type:
tuple
[float
|None
,tuple
[int
,int
] |None
,Mode
]
nnspike.unit.etrobot module
- class nnspike.unit.etrobot.ETRobot(port='/dev/ttyACM0')[source]
Bases:
object
Interface for communicating with LEGO SPIKE Prime robot over serial connection.
This class provides methods to send commands to and receive sensor data from a LEGO SPIKE Prime robot. It maintains a continuous background thread to receive sensor updates and handles motor commands.
Command IDs should be as same as (spike/slot_prod.py) the script in LEGO Spike Prime.
- COMMAND_SET_MOTOR_FORWARD_SPEED_ID
Command ID for setting forward motor speed.
- Type:
int
- COMMAND_SET_MOTOR_BACKWARD_SPEED_ID
Command ID for setting backward motor speed.
- Type:
int
- COMMAND_SET_MOTOR_RELATIVE_POSITION_ID
Command ID for relative position movement.
- Type:
int
- COMMAND_STOP_MOTOR_ID
Command ID for stopping motors.
- Type:
int
- COMMAND_MOVE_ARM_ID
Command ID for moving robot arm.
- Type:
int
- CMD_FLAG
Command flag prefix for serial communication.
- Type:
bytes
- DUMMY
Dummy value for command parameters.
- Type:
int
- COMMAND_SET_MOTOR_FORWARD_SPEED_ID = 201
- COMMAND_SET_MOTOR_BACKWARD_SPEED_ID = 202
- COMMAND_SET_MOTOR_RELATIVE_POSITION_ID = 203
- COMMAND_STOP_MOTOR_ID = 204
- COMMAND_MOVE_ARM_ID = 205
- CMD_FLAG = b'CF:'
- DUMMY = 1
- __init__(port='/dev/ttyACM0')[source]
Initialize the ETRobot with a serial connection.
- Parameters:
port (str, optional) – The serial port to connect to. Defaults to “/dev/ttyACM0”.
- receive()[source]
Update ETRobot motor and sensor status by the received data from the GPIO port.
Note
The update rate should be less than the rate of sending sensor data in LEGO Prime Hub (0.0005 seconds).
- Return type:
None
- get_spike_status()[source]
Get the spike status with last known good sensor values.
- Returns:
Spike status object with consistent sensor data
- Return type:
- retrieve_motors_relative_position()[source]
Retrieve the relative positions of the motors.
- Returns:
The sum of the absolute values of the relative positions of both motors.
- Return type:
int
- set_motor_speed(left_speed, right_speed)[source]
Set the ETRobot motor’s speed.
- Parameters:
left_speed (int) – Left motor speed (-100-100).
right_speed (int) – Right motor speed (-100-100).
- Return type:
None
nnspike.unit.mode_manager module
- class nnspike.unit.mode_manager.ModeManager(course)[source]
Bases:
object
Manages the current mode of the robot unit based on image analysis and sensor data.
This class is responsible for determining and transitioning between different behavior modes including edge following, obstacle avoidance, and various bottle carrying phases. Mode transitions are determined based on motor position thresholds, image processing results (color detection), and sensor readings.
The class uses toggle flags to prevent repeated transitions and ensures each mode change occurs only once when conditions are met.
nnspike.unit.spike_status module
- class nnspike.unit.spike_status.MotorStatus(position=None, power=None, relative_position=None, speed=None)[source]
Bases:
object
Status information for a motor connected to the Spike Prime hub.
-
position:
int
|None
= None
-
power:
int
|None
= None
-
relative_position:
int
|None
= None
-
speed:
int
|None
= None
- __init__(position=None, power=None, relative_position=None, speed=None)
-
position:
- class nnspike.unit.spike_status.ColorSensorStatus(reflected=None, ambient=None, color=None)[source]
Bases:
object
Status information for a color sensor connected to the Spike Prime hub.
-
reflected:
int
|None
= None
-
ambient:
int
|None
= None
-
color:
int
|None
= None
- __init__(reflected=None, ambient=None, color=None)
-
reflected:
- class nnspike.unit.spike_status.VectorStatus(x=0.0, y=0.0, z=0.0)[source]
Bases:
object
Status information for vector-based sensors (gyro, accelerometer).
-
x:
float
= 0.0
-
y:
float
= 0.0
-
z:
float
= 0.0
- __init__(x=0.0, y=0.0, z=0.0)
-
x:
- class nnspike.unit.spike_status.Position(x=0.0, y=0.0)[source]
Bases:
object
Position information from the Spike Prime hub.
-
x:
float
= 0.0
-
y:
float
= 0.0
- __init__(x=0.0, y=0.0)
-
x:
- class nnspike.unit.spike_status.BatteryStatus(voltage=None, percent=None)[source]
Bases:
object
Battery status information from the Spike Prime hub.
-
voltage:
float
|None
= None
-
percent:
float
|None
= None
- __init__(voltage=None, percent=None)
-
voltage:
- class nnspike.unit.spike_status.SensorStatus(distance=None, force=None, color=None, gyro=None, accelerometer=None, position=None)[source]
Bases:
object
Status information for all sensors connected to the Spike Prime hub.
-
distance:
int
|None
= None
-
force:
int
|None
= None
-
color:
ColorSensorStatus
|None
= None
-
gyro:
VectorStatus
|None
= None
-
accelerometer:
VectorStatus
|None
= None
- __init__(distance=None, force=None, color=None, gyro=None, accelerometer=None, position=None)
-
distance:
- class nnspike.unit.spike_status.SpikeStatus(raw_data=None)[source]
Bases:
object
Class to represent and access the status of a Lego Spike Prime hub.
This class provides a structured way to access the data received from the Spike Prime, including sensor readings, motor positions, and battery status.
- __init__(raw_data=None)[source]
Initialize the SpikeStatus object.
- Parameters:
raw_data (
str
|bytes
|dict
|None
) – Optional raw data from the Spike Prime to parse
nnspike.unit.webcam_video_stream module
- class nnspike.unit.webcam_video_stream.WebcamVideoStream(src, save_video, save_path='', resolution=(640, 320), fps=30)[source]
Bases:
object
A threaded video stream reader for webcams with optional video recording.
This class provides a non-blocking way to read frames from a video source by running the capture loop in a separate thread. It supports configurable resolution, frame rate, and optional video recording to file.
The threaded approach helps prevent frame drops and provides smoother video processing by maintaining a minimal buffer size and continuous frame updates in the background.
- Parameters:
src (
int
|str
) – Video source - can be camera index (int) or video file path (str)save_video (
bool
) – Whether to save captured video to filesave_path (
str
) – Path for saving video file (required if save_video is True)resolution (
tuple
) – Video resolution as (width, height) tuple. Default: (640, 320)fps (
int
) – Frames per second for capture and recording. Default: 30
Example
>>> stream = WebcamVideoStream(src=0, save_video=False) >>> stream.start() >>> grabbed, frame = stream.read() >>> stream.stop()
- start()[source]
Start the background thread for reading video frames.
- Return type:
- Returns:
Self reference for method chaining.
- update()[source]
Continuously update frames from the video stream in a background thread.
This method runs in a loop until stopped, constantly reading new frames from the video source to keep the frame buffer current.
- Return type:
None
- read()[source]
Read the most recently captured frame.
If video recording is enabled, also writes the frame to the output file.
- Return type:
tuple[bool, np.ndarray | None]
- Returns:
Tuple of (success_flag, frame) where success_flag indicates if the frame was successfully captured and frame is the image data as a numpy array, or None if capture failed.
nnspike.utils package
Utility functions and algorithms for robot control and image processing.
This module provides essential utility functions for image processing, control algorithms, and data recording used throughout the nnspike package for robot navigation and line following.
- Modules:
control: Computer vision algorithms for line detection and navigation image: Image processing and visualization utilities pid: PID controller implementation for smooth robot movement recorder: Data recording utilities for training data collection
Example
Using image processing and control utilities:
from nnspike.utils import find_line_edges_at_y, PIDController
# Detect line edges in an image left_edge, right_edge = find_line_edges_at_y(image, y_position=100)
# Create PID controller for steering pid = PIDController(kp=1.0, ki=0.1, kd=0.05) steering = pid.update(error)
- nnspike.utils.find_line_edges_at_y(image, roi, target_y, threshold_value=50)[source]
Get the left and right edge points of a black line at a specific Y coordinate.
- Parameters:
image (np.ndarray) – Input image (BGR or grayscale).
roi (tuple[int, int, int, int]) – Tuple (x, y, width, height) defining the ROI.
target_y (float) – The Y coordinate where to detect line edges (in original image coordinates).
threshold_value (float, optional) – Threshold for binary conversion. Defaults to 50.
- Returns:
- Tuple containing:
left_x: X coordinate of left edge (None if not found)
right_x: X coordinate of right edge (None if not found)
- Return type:
tuple[float | None, float | None]
- nnspike.utils.find_gate_virtual_line(image, scan_x=320, from_y=0, to_y=480)[source]
Find the virtual line for gate detection based on gray color regions.
This function detects gray regions in an image and calculates the center point between the leftmost and rightmost gray pixels from a given scan position.
- Parameters:
image (np.ndarray) – Input BGR image as numpy array.
scan_x (int, optional) – X-coordinate to start scanning from. Defaults to 320.
from_y (int, optional) – Starting Y-coordinate for scanning. Defaults to 0.
to_y (int, optional) – Ending Y-coordinate for scanning. Defaults to 480.
- Returns:
- Tuple containing:
virtual_line_coords: Virtual line coordinates (x, y) or None if no gate found
gray_mask: The processed gray mask for debugging
width: Width between left and right borders or None
- Return type:
tuple[tuple[float, float] | None, np.ndarray | None, float | None]
- nnspike.utils.find_bottle_center(image, color, min_area=500)[source]
Find the center coordinates and color pixel count of a colored object in an image using OpenCV.
This function detects objects of a specified color in an image and returns information about the largest detected object. It supports yellow, blue, and red color detection and can be used for various applications including object tracking, color-based navigation, and visual recognition.
This function is optimized for real-time applications with the following improvements: - Accepts numpy array input instead of file paths for real-time processing - Uses adaptive thresholding for better edge detection under various lighting conditions - Applies contour area filtering to reduce noise and false detections - Includes aspect ratio validation to filter out non-object-like shapes - Uses smaller morphological kernels for better performance - Removes debug print statements for cleaner real-time operation
- Parameters:
image (np.ndarray) – Input image as numpy array (BGR format).
color (str) – Color to detect (‘yellow’, ‘blue’, or ‘red’).
min_area (int, optional) – Minimum contour area threshold for filtering noise. Defaults to 500.
- Returns:
- Tuple containing:
center: (x, y) center coordinates of the largest detected object
largest_contour: Contour of the largest detected object
color_pixel_count: Number of detected color pixels
Returns (None, None, 0) if not found.
- Return type:
tuple[tuple[float, float] | None, np.ndarray | None, float | None]
- Raises:
ValueError – If color parameter is not ‘yellow’, ‘blue’, or ‘red’.
- nnspike.utils.find_bullseye(image, threshold=120)[source]
Find the center coordinates of a blue bullseye target in an image.
Uses blue color masking followed by circular shape detection for efficient bullseye detection. Prioritizes detections within a specified distance from the image center (x=320).
- Parameters:
image (np.ndarray) – Input image as numpy array (BGR format).
threshold (float, optional) – Maximum allowed distance from image center (x=320). Bullseyes within range [320-threshold, 320+threshold] are prioritized. Defaults to 120.
- Returns:
- Tuple containing:
center: (x, y) center coordinates of the detected bullseye
contour: Detected bullseye contour
blue_pixel_count: Number of blue pixels
Returns (None, None, None) if not found.
- Return type:
tuple[tuple[float, float] | None, np.ndarray | None, float | None]
- nnspike.utils.calculate_attitude_angle(offset_pixels, roi_bottom_y, camera_height=0.2, focal_length_pixels=640)[source]
Calculate attitude angle (theta) from pixel offset using camera geometry.
This function converts the pixel-based offset detected in the camera image to a real-world attitude angle that represents the robot’s deviation from the desired path. This provides more physically meaningful control compared to simple pixel-based normalization.
- Parameters:
offset_pixels (float) – Lateral offset in pixels from image center
roi_bottom_y (int) – Bottom y-coordinate of ROI (closer to robot)
camera_height (float, optional) – Camera height above ground in meters. Defaults to 0.20.
focal_length_pixels (float, optional) – Camera focal length in pixels. Defaults to 640.
- Returns:
- Attitude angle (theta) in radians. Positive values indicate rightward deviation,
negative values indicate leftward deviation.
- Return type:
float
Note
The camera parameters (height and focal length) should be calibrated for your specific robot setup to ensure accurate angle calculations.
- nnspike.utils.normalize_image(image)[source]
Normalize an input image by converting its color space, applying Gaussian blur, resizing, and scaling pixel values.
This function performs the following steps: 1. Converts the image from RGB to YUV color space. 2. Applies a Gaussian blur with a kernel size of 5x5. 3. Resizes the image to dimensions 200x66. 4. Scales the pixel values to the range [0, 1].
- Parameters:
image (np.ndarray) – Input image in RGB format as a NumPy array.
- Returns:
Normalized image as a NumPy array.
- Return type:
np.ndarray
- nnspike.utils.draw_driving_info(image, info, roi)[source]
Draws driving information on an image.
This function overlays driving-related information onto a given image. It draws a tracing point, a region of interest (ROI) rectangle, and various text annotations based on the provided info dictionary.
- Parameters:
image (np.ndarray) – The input image on which to draw the information.
info (dict) –
A dictionary containing the driving information to be displayed. Expected keys are:
”trace_x” (int or str): The x-coordinate for the tracing point.
”trace_y” (int or str): The y-coordinate for the tracing point.
”text” (dict): A dictionary of text annotations where keys are the labels and values are the corresponding data.
roi (tuple[int, int, int, int]) – A tuple defining the region of interest in the format (x1, y1, x2, y2).
- Returns:
The image with the overlaid driving information.
- Return type:
np.ndarray
- class nnspike.utils.PIDController(kp, ki, kd, setpoint, output_limits=(None, None))[source]
Bases:
object
A PID (Proportional-Integral-Derivative) controller for closed-loop control systems.
This class implements a discrete-time PID controller that continuously calculates an error value as the difference between a desired setpoint and a measured process variable. The controller applies proportional, integral, and derivative corrections to minimize this error over time, making it suitable for robotic control systems, motor speed regulation, and other feedback control applications.
Methodology: The PID controller uses three distinct parameters: - Proportional (P): Provides output proportional to the current error - Integral (I): Accumulates past errors to eliminate steady-state error - Derivative (D): Predicts future error based on the rate of change
The control output is calculated as: output = Kp * error + Ki * integral + Kd * derivative
where error = setpoint - measured_value
Results: The controller produces a continuous control signal that drives the system towards the desired setpoint. Output can be constrained within specified limits to prevent actuator saturation or system damage.
Conclusion: This implementation provides time-aware PID control with integral windup prevention through output limiting. The controller maintains internal state between updates, making it suitable for real-time control applications. Note that proper tuning of Kp, Ki, and Kd parameters is crucial for optimal performance and system stability.
- kp
Proportional gain coefficient.
- Type:
float
- ki
Integral gain coefficient.
- Type:
float
- kd
Derivative gain coefficient.
- Type:
float
- setpoint
Target value that the system should achieve.
- Type:
float
- output_limits
Min/max output constraints.
- Type:
tuple[float | None, float | None]
Example
>>> # Create PID controller for steering control >>> pid = PIDController(kp=1.0, ki=0.1, kd=0.05, setpoint=0.0) >>> pid.set_output_limits((-1.0, 1.0)) >>> >>> # Update control loop >>> measured_position = get_sensor_reading() >>> control_output = pid.update(measured_position) >>> apply_control_signal(control_output)
- __init__(kp, ki, kd, setpoint, output_limits=(None, None))[source]
Initializes the PIDController with specified parameters.
Sets up the PID controller with the given gain values, target setpoint, and optional output constraints. Initializes internal state variables for time tracking and error accumulation.
- Parameters:
kp (float) – Proportional gain coefficient. Higher values increase responsiveness but may cause overshoot.
ki (float) – Integral gain coefficient. Eliminates steady-state error but may cause instability if too high.
kd (float) – Derivative gain coefficient. Reduces overshoot and improves stability but sensitive to noise.
setpoint (float) – Target value that the controller should achieve.
output_limits (tuple[float | None, float | None], optional) – Minimum and maximum output bounds. Use None for unbounded limits. Defaults to (None, None).
Example
>>> # Create PID for temperature control >>> pid = PIDController( ... kp=2.0, ki=0.5, kd=0.1, setpoint=25.0, output_limits=(-100, 100) ... )
- set_output_limits(new_output_limits)[source]
Updates the output limits for the PID controller.
Modifies the minimum and maximum bounds for the controller output. This helps prevent actuator saturation and integral windup.
- Parameters:
new_output_limits (tuple[float, float]) – New minimum and maximum output limits as (min_limit, max_limit).
- Return type:
None
Example
>>> pid.set_output_limits((-50.0, 50.0)) # Limit output to ±50
- update(measured_value)[source]
Computes the PID control output based on current measurement.
Calculates the control signal by combining proportional, integral, and derivative terms. Updates internal state for the next iteration and applies output limits if specified.
- Parameters:
measured_value (float) – Current value of the process variable being controlled (e.g., current position, temperature, speed).
- Returns:
Control output signal, typically used for actuator control (e.g., motor commands, valve positions). Value is constrained within the specified output limits.
- Return type:
float
Note
The first call may have reduced accuracy for the derivative term due to lack of previous time reference.
Example
>>> current_temp = thermometer.read() >>> heater_power = pid.update(current_temp) >>> heater.set_power(heater_power)
- class nnspike.utils.SensorRecorder(output_dir='storage/sensor_data', timestamp=None)[source]
Bases:
object
- High-performance CSV recorder for ETRobot sensor data and control information.
This class manages CSV file creation, writing, and cleanup with optimizations for
real-time robot operation at high frame rates (30fps+).
Features: - Single file open/close for entire session - Buffered writing for performance - Periodic flushing for data safety - Automatic cleanup on program exit - Comprehensive sensor and control data logging
- __init__(output_dir='storage/sensor_data', timestamp=None)[source]
Initialize the sensor recorder.
- Parameters:
output_dir (
str
) – Directory to store CSV filestimestamp (
str
|None
) – Custom timestamp for filename, auto-generated if None
- get_filename()[source]
Get the current CSV filename.
- Returns:
Full path to the CSV file
- Return type:
str
- get_frame_count()[source]
Get the current frame count.
- Returns:
Number of frames recorded
- Return type:
int
- log_frame_data(spike_status, mode=None)[source]
Log sensor data for a single frame.
- Parameters:
spike_status (
SpikeStatus
) – SpikeStatus object with sensor datamode (
Mode
|None
) – Current behavior mode (e.g., Mode.LEFT_EDGE_FOLLOWING)
- Return type:
None
nnspike.utils.control module
Computer vision control utilities for robot navigation and object detection.
This module provides computer vision algorithms for robot control applications, including line following, object detection, and target recognition. The functions process camera images to extract navigation information and detect specific objects for autonomous robot control systems.
The module supports various detection tasks: - Line edge detection for path following - Colored object detection for navigation markers - Bullseye target detection for precision tasks - Gate detection using virtual line calculation - Attitude angle calculation for steering control
- Functions:
- find_line_edges_at_y(image: np.ndarray, roi: tuple[int, int, int, int], target_y: float, threshold_value: float = 50) -> tuple[float | None, float | None]:
Get the left and right edge points of a black line at a specific Y coordinate.
- find_bottle_center(image: np.ndarray, color: str, min_area: int = 500) -> tuple[tuple[float, float] | None, np.ndarray | None, float | None]:
Find the center coordinates and color pixel count of a colored object in an image using OpenCV.
- find_bullseye(image: np.ndarray, threshold: float = 120) -> tuple[tuple[float, float] | None, np.ndarray | None, float | None]:
Find the center coordinates of a blue bullseye target in an image.
- find_gate_virtual_line(image: np.ndarray, scan_x: int = 320, from_y: int = 0, to_y: int = 480) -> tuple[tuple[float, float] | None, np.ndarray | None, float | None]:
Find the virtual line for gate detection based on gray color regions.
- calculate_attitude_angle(offset_pixels: float, roi_bottom_y: int, camera_height: float = 0.20, focal_length_pixels: float = 640) -> float:
Calculate attitude angle (theta) from pixel offset using camera geometry.
Note
All functions expect BGR format numpy arrays as input images. The module is optimized for real-time applications with efficient contour detection and morphological operations.
- nnspike.utils.control.find_line_edges_at_y(image, roi, target_y, threshold_value=50)[source]
Get the left and right edge points of a black line at a specific Y coordinate.
- Parameters:
image (np.ndarray) – Input image (BGR or grayscale).
roi (tuple[int, int, int, int]) – Tuple (x, y, width, height) defining the ROI.
target_y (float) – The Y coordinate where to detect line edges (in original image coordinates).
threshold_value (float, optional) – Threshold for binary conversion. Defaults to 50.
- Returns:
- Tuple containing:
left_x: X coordinate of left edge (None if not found)
right_x: X coordinate of right edge (None if not found)
- Return type:
tuple[float | None, float | None]
- nnspike.utils.control.find_bottle_center(image, color, min_area=500)[source]
Find the center coordinates and color pixel count of a colored object in an image using OpenCV.
This function detects objects of a specified color in an image and returns information about the largest detected object. It supports yellow, blue, and red color detection and can be used for various applications including object tracking, color-based navigation, and visual recognition.
This function is optimized for real-time applications with the following improvements: - Accepts numpy array input instead of file paths for real-time processing - Uses adaptive thresholding for better edge detection under various lighting conditions - Applies contour area filtering to reduce noise and false detections - Includes aspect ratio validation to filter out non-object-like shapes - Uses smaller morphological kernels for better performance - Removes debug print statements for cleaner real-time operation
- Parameters:
image (np.ndarray) – Input image as numpy array (BGR format).
color (str) – Color to detect (‘yellow’, ‘blue’, or ‘red’).
min_area (int, optional) – Minimum contour area threshold for filtering noise. Defaults to 500.
- Returns:
- Tuple containing:
center: (x, y) center coordinates of the largest detected object
largest_contour: Contour of the largest detected object
color_pixel_count: Number of detected color pixels
Returns (None, None, 0) if not found.
- Return type:
tuple[tuple[float, float] | None, np.ndarray | None, float | None]
- Raises:
ValueError – If color parameter is not ‘yellow’, ‘blue’, or ‘red’.
- nnspike.utils.control.find_bullseye(image, threshold=120)[source]
Find the center coordinates of a blue bullseye target in an image.
Uses blue color masking followed by circular shape detection for efficient bullseye detection. Prioritizes detections within a specified distance from the image center (x=320).
- Parameters:
image (np.ndarray) – Input image as numpy array (BGR format).
threshold (float, optional) – Maximum allowed distance from image center (x=320). Bullseyes within range [320-threshold, 320+threshold] are prioritized. Defaults to 120.
- Returns:
- Tuple containing:
center: (x, y) center coordinates of the detected bullseye
contour: Detected bullseye contour
blue_pixel_count: Number of blue pixels
Returns (None, None, None) if not found.
- Return type:
tuple[tuple[float, float] | None, np.ndarray | None, float | None]
- nnspike.utils.control.find_gate_virtual_line(image, scan_x=320, from_y=0, to_y=480)[source]
Find the virtual line for gate detection based on gray color regions.
This function detects gray regions in an image and calculates the center point between the leftmost and rightmost gray pixels from a given scan position.
- Parameters:
image (np.ndarray) – Input BGR image as numpy array.
scan_x (int, optional) – X-coordinate to start scanning from. Defaults to 320.
from_y (int, optional) – Starting Y-coordinate for scanning. Defaults to 0.
to_y (int, optional) – Ending Y-coordinate for scanning. Defaults to 480.
- Returns:
- Tuple containing:
virtual_line_coords: Virtual line coordinates (x, y) or None if no gate found
gray_mask: The processed gray mask for debugging
width: Width between left and right borders or None
- Return type:
tuple[tuple[float, float] | None, np.ndarray | None, float | None]
- nnspike.utils.control.calculate_attitude_angle(offset_pixels, roi_bottom_y, camera_height=0.2, focal_length_pixels=640)[source]
Calculate attitude angle (theta) from pixel offset using camera geometry.
This function converts the pixel-based offset detected in the camera image to a real-world attitude angle that represents the robot’s deviation from the desired path. This provides more physically meaningful control compared to simple pixel-based normalization.
- Parameters:
offset_pixels (float) – Lateral offset in pixels from image center
roi_bottom_y (int) – Bottom y-coordinate of ROI (closer to robot)
camera_height (float, optional) – Camera height above ground in meters. Defaults to 0.20.
focal_length_pixels (float, optional) – Camera focal length in pixels. Defaults to 640.
- Returns:
- Attitude angle (theta) in radians. Positive values indicate rightward deviation,
negative values indicate leftward deviation.
- Return type:
float
Note
The camera parameters (height and focal length) should be calibrated for your specific robot setup to ensure accurate angle calculations.
nnspike.utils.image module
This module provides utility functions for image and video processing using OpenCV and NumPy.
- Functions:
- normalize_image(image: np.ndarray) -> np.ndarray:
Normalize an input image by converting its color space, applying Gaussian blur, resizing, and scaling pixel values.
- draw_driving_info(image: np.ndarray, info: dict, roi: tuple[int, int, int, int]) -> np.ndarray:
Draws driving information on an image by overlaying a tracing point, a region of interest (ROI) rectangle, and various text annotations based on the provided info dictionary.
- extract_video_frames(video_path: str, frame_path: str) -> None:
Extracts frames from a video file and saves them as individual image files in the specified directory.
- nnspike.utils.image.normalize_image(image)[source]
Normalize an input image by converting its color space, applying Gaussian blur, resizing, and scaling pixel values.
This function performs the following steps: 1. Converts the image from RGB to YUV color space. 2. Applies a Gaussian blur with a kernel size of 5x5. 3. Resizes the image to dimensions 200x66. 4. Scales the pixel values to the range [0, 1].
- Parameters:
image (np.ndarray) – Input image in RGB format as a NumPy array.
- Returns:
Normalized image as a NumPy array.
- Return type:
np.ndarray
- nnspike.utils.image.draw_driving_info(image, info, roi)[source]
Draws driving information on an image.
This function overlays driving-related information onto a given image. It draws a tracing point, a region of interest (ROI) rectangle, and various text annotations based on the provided info dictionary.
- Parameters:
image (np.ndarray) – The input image on which to draw the information.
info (dict) –
A dictionary containing the driving information to be displayed. Expected keys are:
”trace_x” (int or str): The x-coordinate for the tracing point.
”trace_y” (int or str): The y-coordinate for the tracing point.
”text” (dict): A dictionary of text annotations where keys are the labels and values are the corresponding data.
roi (tuple[int, int, int, int]) – A tuple defining the region of interest in the format (x1, y1, x2, y2).
- Returns:
The image with the overlaid driving information.
- Return type:
np.ndarray
nnspike.utils.pid module
PID (Proportional-Integral-Derivative) controller implementation for control systems.
This module provides a discrete-time PID controller class designed for real-time control applications in robotics, automation, and other feedback control systems. The implementation focuses on practical usage with features like output limiting, time-aware calculations, and integral windup prevention.
Methodology: The PID controller implements the standard control algorithm:
output = Kp * error + Ki * ∫error*dt + Kd * d(error)/dt
Where: - Kp (Proportional): Reacts to current error magnitude - Ki (Integral): Eliminates steady-state error by accumulating past errors - Kd (Derivative): Dampens oscillations by predicting future error trends
Results: The controller produces smooth, stable control signals suitable for: - Motor speed and position control - Temperature regulation - Line following in robotics - Steering correction systems - Process control applications
Conclusion: This implementation provides a robust foundation for feedback control systems with proper time handling and output constraints. Users should focus on proper parameter tuning (Kp, Ki, Kd) for their specific application to achieve optimal performance and system stability.
Example
Basic usage for a line-following robot:
>>> from nnspike.utils.pid import PIDController
>>>
>>> # Create steering controller (setpoint = 0 means centered)
>>> steering_pid = PIDController(kp=1.2, ki=0.05, kd=0.8, setpoint=0.0)
>>> steering_pid.set_output_limits((-1.0, 1.0)) # Limit steering range
>>>
>>> # Control loop
>>> while robot.is_running():
... line_position = robot.get_line_position() # -1 to 1 range
... steering_correction = steering_pid.update(line_position)
... robot.set_steering(steering_correction)
... time.sleep(0.01) # 100Hz control frequency
- Classes:
PIDController: Main PID controller implementation with output limiting.
- Dependencies:
time: For delta time calculations between updates.
- class nnspike.utils.pid.PIDController(kp, ki, kd, setpoint, output_limits=(None, None))[source]
Bases:
object
A PID (Proportional-Integral-Derivative) controller for closed-loop control systems.
This class implements a discrete-time PID controller that continuously calculates an error value as the difference between a desired setpoint and a measured process variable. The controller applies proportional, integral, and derivative corrections to minimize this error over time, making it suitable for robotic control systems, motor speed regulation, and other feedback control applications.
Methodology: The PID controller uses three distinct parameters: - Proportional (P): Provides output proportional to the current error - Integral (I): Accumulates past errors to eliminate steady-state error - Derivative (D): Predicts future error based on the rate of change
The control output is calculated as: output = Kp * error + Ki * integral + Kd * derivative
where error = setpoint - measured_value
Results: The controller produces a continuous control signal that drives the system towards the desired setpoint. Output can be constrained within specified limits to prevent actuator saturation or system damage.
Conclusion: This implementation provides time-aware PID control with integral windup prevention through output limiting. The controller maintains internal state between updates, making it suitable for real-time control applications. Note that proper tuning of Kp, Ki, and Kd parameters is crucial for optimal performance and system stability.
- kp
Proportional gain coefficient.
- Type:
float
- ki
Integral gain coefficient.
- Type:
float
- kd
Derivative gain coefficient.
- Type:
float
- setpoint
Target value that the system should achieve.
- Type:
float
- output_limits
Min/max output constraints.
- Type:
tuple[float | None, float | None]
Example
>>> # Create PID controller for steering control >>> pid = PIDController(kp=1.0, ki=0.1, kd=0.05, setpoint=0.0) >>> pid.set_output_limits((-1.0, 1.0)) >>> >>> # Update control loop >>> measured_position = get_sensor_reading() >>> control_output = pid.update(measured_position) >>> apply_control_signal(control_output)
- __init__(kp, ki, kd, setpoint, output_limits=(None, None))[source]
Initializes the PIDController with specified parameters.
Sets up the PID controller with the given gain values, target setpoint, and optional output constraints. Initializes internal state variables for time tracking and error accumulation.
- Parameters:
kp (float) – Proportional gain coefficient. Higher values increase responsiveness but may cause overshoot.
ki (float) – Integral gain coefficient. Eliminates steady-state error but may cause instability if too high.
kd (float) – Derivative gain coefficient. Reduces overshoot and improves stability but sensitive to noise.
setpoint (float) – Target value that the controller should achieve.
output_limits (tuple[float | None, float | None], optional) – Minimum and maximum output bounds. Use None for unbounded limits. Defaults to (None, None).
Example
>>> # Create PID for temperature control >>> pid = PIDController( ... kp=2.0, ki=0.5, kd=0.1, setpoint=25.0, output_limits=(-100, 100) ... )
- set_output_limits(new_output_limits)[source]
Updates the output limits for the PID controller.
Modifies the minimum and maximum bounds for the controller output. This helps prevent actuator saturation and integral windup.
- Parameters:
new_output_limits (tuple[float, float]) – New minimum and maximum output limits as (min_limit, max_limit).
- Return type:
None
Example
>>> pid.set_output_limits((-50.0, 50.0)) # Limit output to ±50
- update(measured_value)[source]
Computes the PID control output based on current measurement.
Calculates the control signal by combining proportional, integral, and derivative terms. Updates internal state for the next iteration and applies output limits if specified.
- Parameters:
measured_value (float) – Current value of the process variable being controlled (e.g., current position, temperature, speed).
- Returns:
Control output signal, typically used for actuator control (e.g., motor commands, valve positions). Value is constrained within the specified output limits.
- Return type:
float
Note
The first call may have reduced accuracy for the derivative term due to lack of previous time reference.
Example
>>> current_temp = thermometer.read() >>> heater_power = pid.update(current_temp) >>> heater.set_power(heater_power)
nnspike.utils.recorder module
Sensor Data Recorder Module
This module provides a class for recording ETRobot sensor status and control data to CSV files. It’s designed for high-performance logging during robot operation without impacting frame rates.
- class nnspike.utils.recorder.SensorRecorder(output_dir='storage/sensor_data', timestamp=None)[source]
Bases:
object
- High-performance CSV recorder for ETRobot sensor data and control information.
This class manages CSV file creation, writing, and cleanup with optimizations for
real-time robot operation at high frame rates (30fps+).
Features: - Single file open/close for entire session - Buffered writing for performance - Periodic flushing for data safety - Automatic cleanup on program exit - Comprehensive sensor and control data logging
- __init__(output_dir='storage/sensor_data', timestamp=None)[source]
Initialize the sensor recorder.
- Parameters:
output_dir (
str
) – Directory to store CSV filestimestamp (
str
|None
) – Custom timestamp for filename, auto-generated if None
- start_recording()[source]
Start CSV recording session.
Creates output directory, opens CSV file, writes headers, and sets up cleanup.
- Return type:
None
- log_frame_data(spike_status, mode=None)[source]
Log sensor data for a single frame.
- Parameters:
spike_status (
SpikeStatus
) – SpikeStatus object with sensor datamode (
Mode
|None
) – Current behavior mode (e.g., Mode.LEFT_EDGE_FOLLOWING)
- Return type:
None
- get_filename()[source]
Get the current CSV filename.
- Returns:
Full path to the CSV file
- Return type:
str
- get_frame_count()[source]
Get the current frame count.
- Returns:
Number of frames recorded
- Return type:
int