I was one of the few lucky folks to get my hands on a Google AIY voice kit for Raspberry Pi from The MagPi. After following the official instructions and toying around with the box I really wanted to figure out how to implement some additional features.
My eventual goal was to implement the "Ok Google" hotword detection, but it seems I was beaten to the punch.
But of course learning is a big part of the fun, and the Google documentation for the Trigger system was lacking, so I kept hacking at my first set of test code and learned a few things worth sharing.
First off I'll present the code that I finally got working, take it a step further with Amazon Dash button detection, and finally summarize some important notes that seem to be lacking from the trigger documentation.
Getting a trigger to work:
I'll save the detailed commentary for the code summary at the end. As a first attempt I put together an example Trigger definition that triggers voice recognition when a certain file is created on the filesystem. Create the folowing file at "~/voice-recognizer-raspi/src/triggers/file.py":
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# Licensed under the Apache License, Version 2.0 (the "License"); | |
# you may not use this file except in compliance with the License. | |
# You may obtain a copy of the License at | |
# | |
# http://www.apache.org/licenses/LICENSE-2.0 | |
# | |
# Unless required by applicable law or agreed to in writing, software | |
# distributed under the License is distributed on an "AS IS" BASIS, | |
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | |
# See the License for the specific language governing permissions and | |
# limitations under the License. | |
"""Trigger based on the existence of a specific file.""" | |
from time import sleep # Sleep to prevent the file checker from using 100% CPU | |
import os # To monitor for files this will be required | |
import stat # For setting permissions on the trigger file directory | |
import threading # Separate file monitor thread from the voice recognizer | |
from triggers.trigger import Trigger # Google AIY Trigger class | |
# The following allows us to print debugging information to the terminal | |
import logging | |
logger = logging.getLogger('trigger') | |
# Create a new class for the FileTrigger | |
class FileTrigger(Trigger): | |
"""Trigger based on the existence of a specific file.""" | |
POLLING_TIME = 0.5 # The file monitor will wait 0.5s between file checks | |
# This file will trigger voice recognition | |
TRIGGER_FILE = r"/tmp/voice_recognizer/trigger" | |
# TODO: specify a group to have access to the trigger file directory | |
def __init__(self): | |
super().__init__() # I'm not clear on what this does | |
def start(self): | |
# Delete trigger file if it exists at startup | |
if os.path.isfile(self.TRIGGER_FILE): | |
logger.info('cleaning up pre-existing trigger file') | |
os.remove(self.TRIGGER_FILE) | |
# Create the trigger directory if needed | |
# Determine trigger directory | |
trigger_dir = os.path.dirname(os.path.abspath(self.TRIGGER_FILE)) | |
if not os.path.exists(trigger_dir): | |
os.makedirs(trigger_dir) # Create directory and parents if required | |
# Store existing permissions of the trigger directory | |
permissions = stat.S_IMODE(os.lstat(trigger_dir)[stat.ST_MODE]) | |
# Make the trigger directory world writeable | |
os.chmod(trigger_dir, permissions | stat.S_IWOTH) | |
# Start the file monitor loop as a separate thread | |
threading.Thread(target=self.file_monitor_loop).start() | |
def file_monitor_loop(self): | |
# Loop until the file exists | |
while not os.path.isfile(self.TRIGGER_FILE): | |
sleep(self.POLLING_TIME) # Wait POLLING_TIME in seconds | |
os.remove(self.TRIGGER_FILE) # Delete trigger file | |
self.callback() # Trigger voice recognition |
- When the file specified by TRIGGER_FILE (default is "/tmp/voice_recognizer/trigger") is created the system will trigger voice recognition and delete the file to reset the state.
- By default the trigger's folder is configured as world-writeable. If you want to implement secure access (e.g. by ACL or user/group) you should change the TRIGGER_FILE to a persistent folder on the disk (e.g. "/home/pi/voice_recognizer_raspi/src/trigger/trigger_file/trigger") and set the folder (i.e. "trigger_file/") permissions accordingly.
You need to modify "~/voice-recognizer-raspi/src/main.py" and change the section pertaining to triggers from this:
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
if args.trigger == 'gpio': | |
import triggers.gpio | |
triggerer = triggers.gpio.GpioTrigger(channel=23) | |
msg = 'Press the button on GPIO 23' | |
elif args.trigger == 'clap': | |
import triggers.clap | |
triggerer = triggers.clap.ClapTrigger(recorder) | |
msg = 'Clap your hands' | |
else: | |
logger.error("Unknown trigger '%s'", args.trigger) | |
return |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
if args.trigger == 'gpio': | |
import triggers.gpio | |
triggerer = triggers.gpio.GpioTrigger(channel=23) | |
msg = 'Press the button on GPIO 23' | |
elif args.trigger == 'clap': | |
import triggers.clap | |
triggerer = triggers.clap.ClapTrigger(recorder) | |
msg = 'Clap your hands' | |
elif args.trigger == 'file': | |
import triggers.file | |
triggerer = triggers.file.FileTrigger() | |
msg = 'Trigger on existence of a specific file' | |
else: | |
logger.error("Unknown trigger '%s'", args.trigger) | |
return |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
source ~/voice-recognizer-raspi/env/bin/activate | |
~/voice-recognizer-raspi/src/main.py -T file |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
touch /tmp/voice_recognizer/trigger |
The really nice thing about this configuration is it makes programmatic triggering much easier than writing a trigger script; to trigger the voice kit all you need to do is have your program create the trigger file. This should make it relatively easy to write a trigger that will fire off for arbitrary functions such as a webpage submission, pressing the GPIO button on the kit, or even an Amazon Dash button. Unlike the other included triggers it can even trigger for multiple types of input without making modifications to the voice recognizer code.
Triggering Voice Recognition with an Amazon Dash button:
So far I have been able to adapt the Amazon Dash button example code from the MagPi to trigger voice recognition.
The bad news is the example is missing some important details, but the good news is I seem to have worked them out on my Pi and I'll share what I know here.
I'm going to show how to implement this in python3 which I'm most familiar with, but for those more familiar with Java there are examples using Node.js that you may find easier to follow. I didn't use Python 2.7 because I was having problems getting scapy to run.
First off you'll need to configure your environment with the necessary packages. The example I'm working from depends on a Python library called scapy, which also depends on the package tcpdump.
Important note: scapy sniffs packets, which requires low-level access to the network interface. These scripts will only work when run as root (e.g. with su, sudo, or the root crontab). It's generally a bad practice to run code you do not trust with root privileges: it's up to you whether you think the benefits outweigh the risks. If for some reason you suspect the following source code, python, scapy, or tcpdump could harm your system you should be aware that these tasks will have the most privileged access to your Raspberry Pi. I cannot be held responsible for any ill effects caused by executing the following code.
Now then, first you'll need to ensure you have scapy and tcpdump available to root. To do so open a terminal and run the following commands:
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
sudo pip3 install python3-scapy # Install scapy on root's python3 | |
sudo which python3 >> ~/.scapy_installed.log # For debugging | |
sudo apt-get update # Update apt package list | |
sudo apt-get install tcpdump # Install prerequisite for scapy |
- Configure your Amazon Dash button completely with the Amazon app
- Deactivate the button
- Start configuring the button again, but when you reach the "choose an item" page don't select an item and exit out of the app.
The next thing you need is the MAC address(es) of your dash button(s). Once again the instructions exist in other examples, but I'll provide the short summary here. You'll need the MAC address of your Dash button(s).
If you don't know what a MAC address is you might have some trouble. The only pointer I can give is that a MAC address is 12 hexadecimal digits grouped into pairs separated by colons e.g. 1A:2B:3C:4D:5E:6F.
You should be able to log in to your router and get the IP and MAC address of your dash button(s). On my router it showed up only by the MAC address, however I've read that it may show up as "Internet Device". It's important to know that the Dash button usually connects about 5 seconds after it is pressed, and that it may not show up on your router except immediately after it connects.
Note: it's been reported that you can shorten the response time to 1 second by configuring a static IP address for the dash button with your router.
Warning: I've found that the phone I used to configure the Dash button is constantly pushing notifications whenever the Dash button is pressed. For me this is acceptable because I don't need notifications from the Amazon app and can block them. If you normally use the Amazon app this could be a problem I don't currently have a solution for.
The next big step is to create a script that will trigger whenever the dash button is pressed.
For reference you should probably try a debugging script. Here's the python script that helped me understand what was wrong with my configuration (it's based on a StackExchange query with some good sample code):
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python3 | |
from scapy.all import * | |
def arp_display(pkt): | |
if pkt[ARP].op == 1: | |
return "ARP Probe from: " + pkt[ARP].hwsrc | |
print( sniff(prn=arp_display, filter="arp", store=0, count=0)) |
Note I've found that you need to have the Raspberry Pi plugged into ethernet. Wi-Fi may not work to detect ARP packets. You should see a listing of ARP requests until you exit using ctrl+c. If you've hit the Dash button in this time its MAC address should show in the listings.
Now here's the script to use your dash button(s) as a trigger:
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python3 | |
from scapy.all import * | |
import os | |
import stat | |
# Trigger file | |
trigger_file = r'/tmp/voice_recognizer/trigger' | |
# Buttons | |
# Create a dictionary entry for each button | |
buttons = {} | |
buttons["Bounty"] = "##:##:##:##:##:##" | |
buttons["Charmin"] = "##:##:##:##:##:##" | |
buttons["Tide"] = "##:##:##:##:##:##" | |
# and so on. Add as many entries to the "buttons" variable as you need. | |
# Create a listing of MAC addresses to trigger on | |
# All values are taken from our 'buttons' dictionary above | |
mac_addresses = [buttons[button].lower() for button in buttons] | |
def arp_detect(pkt): | |
if pkt[ARP].op == 1: #network request | |
print(pkt[ARP].hwsrc) | |
if pkt[ARP].hwsrc in mac_addresses: | |
# Create the trigger directory if needed | |
# Determine trigger directory | |
trigger_dir = os.path.dirname(os.path.abspath(trigger_file)) | |
if not os.path.exists(trigger_dir): | |
os.makedirs(trigger_dir) # Create directory and parents if required | |
# Store existing permissions of the trigger directory | |
permissions = stat.S_IMODE(os.lstat(trigger_dir)[stat.ST_MODE]) | |
# Make the trigger directory world writeable | |
os.chmod(trigger_dir, permissions | stat.S_IWOTH) | |
# Touch the trigger file | |
with open(trigger_file, 'a'): | |
pass | |
# Scan for packets | |
# count=0 tells the sniff function to run forever | |
sniff(prn=arp_detect, filter="arp", store=0, count=0) |
- You'll need to replace ##:##:##:##:##:## with your MAC address(es).
- If you changed the trigger file in the previous examples you must configure it here by setting the variable "trigger_file" (default is "/tmp/voice_recognizer/trigger" same as our trigger script above).
- Each dash button MAC address needs to be added to the "buttons" dictionary. The keys (i.e. "Bounty" or "Charmin") need to be unique for each dictionary entry, but you can add as many unique dictionary entries as you like.
- It has to be run as root to work.
Getting the script to run whenever you want On my system I saved the above script as "/root/src/detect_dash.py" (you'll probably need to "sudo mkdir /root/src" before you can save there) I'll use this location in the following examples. You'll need to make this script executable with "sudo chmod +x /root/src/detect_dash.py".
Finally let's get the script running whenever you want the button to be detected. You have a few options:
- By far the easiest is if you can open a terminal and leave it running. All you need to do is run
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters. Learn more about bidirectional Unicode characters
sudo /root/src/detect_dash.py - If you are running from ssh or otherwise running without a desktop environment you can familiarize yourself with screen or tmux in order to get it running in the background. Unfortunately a full explanation of how to use those programs is outside the scope of this post.
- Another way to run it from ssh is to force it to the background (you won't get any debugging messages if there's a problem!). To run the script in the background type into a terminal This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters. Learn more about bidirectional Unicode characters
sudo nohup /root/src/detect_dash.py 0<&- 1>/dev/null 2>/dev/null & - If you want it to run every time your system starts up you can use the root crontab to schedule it. To edit the root crontab enter "sudo crontab -e" in the command prompt and add a new line to the crontab with the text "@reboot sudo python3 /root/src/detect_dash.py". Reboot your system and see if it worked!
Finally we've reached the part where I clarify what was learned about the Trigger system of the Google AIY kit.
Triggers don't have any particular documentation. The getting started guide directs you to the GPIO trigger code (which is undocumented) and expects you to figure out the rest. Here's what I've figured out about how triggers work:
- The "main.py" script kicks off everything. To implement a new trigger you need to make it aware of the trigger you'd like to use. In the code example above we added the lines that tell it what flag to accept for the trigger, what file and class to import, and what text to prompt when the system initializes the trigger.
- From the trigger class definition (file.py) the system will run the start() function and wait for the trigger to make a callback (self.callback()).
- When the callback occurs the system listens for a prompt, and responds.
- After the system has responded it will return to step 2, retriggering the start() function.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import threading # Separate triggerer thread from main thread | |
from triggers.trigger import Trigger # Google AIY Trigger class | |
class NewTrigger(Trigger): | |
def __init__(self): | |
super().__init__() # I'm not clear on what this does | |
def start(self): | |
############################### | |
# Perform any necessary setup # | |
############################### | |
# Start the triggerer as a separate thread | |
threading.Thread(target=self.triggerer).start() | |
def triggerer(self): | |
###################################################### | |
# Perform whatever polling required for your trigger # | |
###################################################### | |
self.callback() # Trigger voice recognition |
Thanks for reading! I hope this helped someone.
Feel free to leave questions in the comments section or on the Raspberry Pi forums
Pretty good post. I just stumbled upon your blog and wanted to say that I have really enjoyed reading your blog posts. Any way I'll be subscribing to your feed and I hope you post again soon. Big thanks for the useful info.
ReplyDeleteAmazon