Examples¶
- BLE Alerts
- BLE Scanner
- Eddystone Beacon
- iBeacon Beacon
- Eddystone Reader
- iBeacon Reader
- BLE Alerts Secure 1
- BLE Alerts Secure 2
- BLE Wifi
BLE Alerts¶
BLE Alerts
==========
An implementationof an Alert Notification device to show how services and characteristics can be easily created.
################################################################################
# BLE Alerts
#
# Created by Zerynth Team 2019 CC
# Author: G. Baldi
###############################################################################
import streams
#import the ESP32 BLE driver: a BLE capable VM is also needed!
from espressif.esp32ble import esp32ble as bledrv
# then import the BLE modue
from wireless import ble
streams.serial()
notifications_enabled = True
connected = False
# Let's define some callbacks
def value_cb(status,val):
# check incoming commands and enable/disable notifications
global notifications_enabled
print("Value changed to",val[0],val[1])
if val[0]==0:
print("Notifications enabled")
notifications_enabled = True
elif val[0]==2:
notifications_enabled = False
print("Notifications disabled")
else:
print("Notifications unchanged")
def connection_cb(address):
global connected
print("Connected to",ble.btos(address))
connected = True
def disconnection_cb(address):
global connected
print("Disconnected from",ble.btos(address))
# let's start advertising again
ble.start_advertising()
connected = False
try:
# initialize BLE driver
bledrv.init()
# Set GAP name and no security
ble.gap("ZNotifier",security=(ble.SECURITY_MODE_1,ble.SECURITY_LEVEL_1))
# add some GAP callbacks
ble.add_callback(ble.EVT_CONNECTED,connection_cb)
ble.add_callback(ble.EVT_DISCONNECTED,disconnection_cb)
# Create a GATT Service: let's try an Alert Notification Service
# (here are the specs: https://www.bluetooth.com/specifications/gatt/viewer?attributeXmlFile=org.bluetooth.service.alert_notification.xml)
s = ble.Service(0x1811)
# The Alert Notification service has multiple characteristics. Let's add them one by one
# Create a GATT Characteristic for counting new alerts.
# specs: https://www.bluetooth.com/specifications/gatt/viewer?attributeXmlFile=org.bluetooth.characteristic.supported_new_alert_category.xml
cn = ble.Characteristic(0x2A47, ble.NOTIFY | ble.READ,16,"New Alerts",ble.BYTES)
# Add the GATT Characteristic to the Service
s.add_characteristic(cn)
# Create anothr GATT Characteristic for enabling/disabling alerts
# specs: https://www.bluetooth.com/specifications/gatt/viewer?attributeXmlFile=org.bluetooth.characteristic.alert_notification_control_point.xml
cc = ble.Characteristic(0x2A44, ble.WRITE ,2,"Alerts control",ble.BYTES)
# Add the GATT Characteristic to the Service
s.add_characteristic(cc)
# Add a callback to be notified of changes
cc.set_callback(value_cb)
# Add the Service. You can create additional services and add them one by one
ble.add_service(s)
# Setup advertising to 50ms
ble.advertising(50)
# Start the BLE stack
ble.start()
# Now start advertising
ble.start_advertising()
except Exception as e:
print(e)
# loop forever
while True:
print(".")
if random(0,100)<50 and notifications_enabled and connected:
value = bytearray(cn.get_value())
value[0]=0 # simple alert type
if value[1]<255:
value[1]=value[1]+1 # add a notification
print("Adding a new notification, total of",value[1])
# the remaining 14 bytes can be some text
value[2:10] = "Zerynth!"
# set the new value. If ble notifications are enabled, the connected device will receive the change
cn.set_value(value)
sleep(5000)
BLE Scanner¶
BLE Scanner
===========
A simple example implementing a BLE packet scanner.
################################################################################
# BLE Scanner
#
# Created by Zerynth Team 2019 CC
# Author: G. Baldi
###############################################################################
import streams
#import the ESP32 BLE driver: a BLE capable VM is also needed!
from espressif.esp32ble import esp32ble as bledrv
# then import the BLE modue
from wireless import ble
streams.serial()
# Let's define some callbacks and constants
# How long to scan for in milliseconds
scan_time=30000
def scan_report_cb(data):
print("Detected packet from",ble.btos(data[4]),"containing",ble.btos(data[3]))
print(" packet is of type",data[0],"while address is of type",data[1])
print(" remote device has RSSI of",data[2])
def scan_start_cb(data):
print("Scan started")
def scan_stop_cb(data):
print("Scan stopped")
#let's start it up again
ble.start_scanning(scan_time)
try:
# initialize BLE driver
bledrv.init()
# Set GAP name and no security
ble.gap("Zerynth",security=(ble.SECURITY_MODE_1,ble.SECURITY_LEVEL_1))
ble.add_callback(ble.EVT_SCAN_REPORT,scan_report_cb)
ble.add_callback(ble.EVT_SCAN_STARTED,scan_start_cb)
ble.add_callback(ble.EVT_SCAN_STOPPED,scan_stop_cb)
#set scanning parameters: every 100ms for 50ms and no duplicates
ble.scanning(100,50,duplicates=0)
# Start the BLE stack
ble.start()
# Now start scanning for 30 seconds
ble.start_scanning(scan_time)
except Exception as e:
print(e)
# loop forever
while True:
print(".")
sleep(10000)
Eddystone Beacon¶
Eddystone Beacon
================
An implementation of a simple Eddystone Beacon advertising some predefined packets.
################################################################################
# Eddystone Beacon
#
# Created by Zerynth Team 2019 CC
# Author: G. Baldi
###############################################################################
import streams
import timers
#import the ESP32 BLE driver: a BLE capable VM is also needed!
from espressif.esp32ble import esp32ble as bledrv
# then import the BLE module and beacons
from wireless import ble
from wireless import ble_beacons as bb
streams.serial()
adv_content = 0
battery_level = 40
temperature = 23.2
pdu_count = 0
uptime = timers.timer()
uptime.start()
# create payloads to cycle through
payloads = [
bb.eddy_encode_uid("Zerynth","Python",-69), # UID Eddystone payload
bb.eddy_encode_url("https://www.zerynth.com",-69), # URL Eddystone payload
bb.eddy_encode_tlm(battery_level,temperature,pdu_count,uptime.get()/1000) # TLM Eddystone payload
]
# this callback will be called at the end of an advertising cycle.
# it is used to switch to the next content
def adv_stop_cb(data):
global pdu_count,adv_content
print("Advertising stopped")
adv_content = (adv_content+1)%3
if adv_content == 0:
# advertise UID
interval = 100
timeout = 10000
elif adv_content == 1:
# advertise URL
interval = 100
timeout = 15000
else:
# advertise TLM
interval = 100
timeout = 150
pdu_count+=1
payloads[2] = bb.eddy_encode_tlm(battery_level,temperature,pdu_count,uptime.get()/1000) # TLM Eddystone payload
payload = payloads[adv_content]
ble.advertising(interval,timeout=timeout,payload=payload,mode=ble.ADV_UNCN_UND)
ble.start_advertising()
print("Advertising restarted with",ble.btos(payload))
try:
# initialize BLE driver
bledrv.init()
# Set GAP name and no security
ble.gap("Zerynth",security=(ble.SECURITY_MODE_1,ble.SECURITY_LEVEL_1))
ble.add_callback(ble.EVT_ADV_STOPPED,adv_stop_cb)
# set advertising options: advertise every second with custom payload in non connectable undirected mode
# after 10 seconds, stop and change payload
ble.advertising(100,timeout=10000,payload=payloads[adv_content],mode=ble.ADV_UNCN_UND)
# Start the BLE stack
ble.start()
# Now start scanning for 30 seconds
ble.start_advertising()
except Exception as e:
print(e)
# loop forever
while True:
print(".")
sleep(10000)
iBeacon Beacon¶
iBeacon
=======
An Apple beacon advertising its own uuid.
################################################################################
# iBeacon
#
# Created by Zerynth Team 2019 CC
# Author: G. Baldi
###############################################################################
import streams
import timers
#import the ESP32 BLE driver: a BLE capable VM is also needed!
from espressif.esp32ble import esp32ble as bledrv
# then import the BLE module and beacons
from wireless import ble
from wireless import ble_beacons as bb
streams.serial()
try:
# initialize BLE driver
bledrv.init()
# Set GAP name and no security
ble.gap("Zerynth",security=(ble.SECURITY_MODE_1,ble.SECURITY_LEVEL_1))
# set advertising options: advertise every second with custom payload in non connectable undirected mode
ble.advertising(20,payload=bb.ibeacon_encode("fb0b57a2-8228-44cd-913a-94a122ba1206",10,3,-69),mode=ble.ADV_UNCN_UND)
# Start the BLE stack
ble.start()
# Now start advertising
ble.start_advertising()
except Exception as e:
print(e)
# loop forever
while True:
print(".")
sleep(10000)
Eddystone Reader¶
################################################################################
# Eddystone Reader
#
# Created by Zerynth Team 2019 CC
# Author: G. Baldi
###############################################################################
import streams
#import the ESP32 BLE driver: a BLE capable VM is also needed!
from espressif.esp32ble import esp32ble as bledrv
# then import the BLE modue
from wireless import ble
from wireless import ble_beacons as bb
streams.serial()
# Let's define some callbacks
def scan_report_cb(data):
pdata = data[3]
rssi = data[2]
try:
etype = bb.eddy_decode_type(pdata)
if etype==bb.EDDY_URL:
url, tx = bb.eddy_decode(pdata)
print("Eddy URL found with",url,tx,rssi)
elif etype==bb.EDDY_UID:
namespace, instance, tx = bb.eddy_decode(pdata)
print("Eddy UID found with",[hex(x) for x in namespace],[hex(x) for x in instance],tx,rssi)
elif etype==bb.EDDY_LTM:
battery,temperature, count, uptime = bb.eddy_decode(pdata)
print("Eddy LTM found with",battery,temperature,count,uptime,rssi)
# print("iBeacon found with",[hex(x) for x in uuid],major,minor,tx,rssi)
except Exception as e:
print("::")
def scan_stop_cb(data):
print("Scan stopped")
#let's start it up again
ble.start_scanning(3000)
try:
# initialize BLE driver
bledrv.init()
# Set GAP name and no security
ble.gap("Zerynth",security=(ble.SECURITY_MODE_1,ble.SECURITY_LEVEL_1))
ble.add_callback(ble.EVT_SCAN_REPORT,scan_report_cb)
ble.add_callback(ble.EVT_SCAN_STOPPED,scan_stop_cb)
#set scanning parameters: every 100ms for 100ms and no duplicates
ble.scanning(100,100,duplicates=0)
# Start the BLE stack
ble.start()
# Now start scanning for 3 seconds
ble.start_scanning(3000)
except Exception as e:
print(e)
# loop forever
while True:
print(".")
sleep(10000)
iBeacon Reader¶
iBeacon Reader
==============
A simple firmware to scan for iBeacons.
################################################################################
# iBeacon Reader
#
# Created by Zerynth Team 2019 CC
# Author: G. Baldi
###############################################################################
import streams
#import the ESP32 BLE driver: a BLE capable VM is also needed!
from espressif.esp32ble import esp32ble as bledrv
# then import the BLE modue
from wireless import ble
from wireless import ble_beacons as bb
streams.serial()
# Let's define some callbacks
def scan_report_cb(data):
pdata = data[3]
rssi = data[2]
try:
uuid, major, minor, tx = bb.ibeacon_decode(pdata)
print("iBeacon found with",[hex(x) for x in uuid],major,minor,tx,rssi)
except Exception as e:
print("::")
def scan_stop_cb(data):
print("Scan stopped")
#let's start it up again
ble.start_scanning(3000)
try:
# initialize BLE driver
bledrv.init()
# Set GAP name and no security
ble.gap("Zerynth",security=(ble.SECURITY_MODE_1,ble.SECURITY_LEVEL_1))
ble.add_callback(ble.EVT_SCAN_REPORT,scan_report_cb)
ble.add_callback(ble.EVT_SCAN_STOPPED,scan_stop_cb)
#set scanning parameters: every 100ms for 100ms and no duplicates
ble.scanning(100,100,duplicates=0)
# Start the BLE stack
ble.start()
# Now start scanning for 3 seconds
ble.start_scanning(3000)
except Exception as e:
print(e)
# loop forever
while True:
print(".")
sleep(10000)
BLE Alerts Secure 1¶
BLE Alerts with Security 1
==========================
An implementationof an Alert Notification device to show how services and characteristics can be easily created. It also features secure connections with bonding.
################################################################################
# BLE Alerts with security
#
# Created by Zerynth Team 2019 CC
# Author: G. Baldi
###############################################################################
import streams
#import the ESP32 BLE driver: a BLE capable VM is also needed!
from espressif.esp32ble import esp32ble as bledrv
# then import the BLE modue
from wireless import ble
streams.serial()
notifications_enabled = True
connected = False
# Let's define some callbacks
def value_cb(status,val):
# check incoming commands and enable/disable notifications
global notifications_enabled
print("Value changed to",val[0],val[1])
if val[0]==0:
print("Notifications enabled")
notifications_enabled = True
elif val[0]==2:
notifications_enabled = False
print("Notifications disabled")
else:
print("Notifications unchanged")
def connection_cb(address):
global connected
print("Connected to",ble.btos(address))
connected = True
def disconnection_cb(address):
global connected
print("Disconnected from",ble.btos(address))
# let's start advertising again
ble.start_advertising()
connected = False
# Let's define some security callbacks
def show_key_cb(passkey):
print("ENTER THIS PIN ON THE MASTER:",passkey)
try:
# initialize BLE driver
bledrv.init()
# Set GAP name and LEVEL 2 security
# !!! If security is not set, no secure connection will be possible
ble.gap("ZNotifier",security=(ble.SECURITY_MODE_1,ble.SECURITY_LEVEL_2))
# add some GAP callbacks
ble.add_callback(ble.EVT_CONNECTED,connection_cb)
ble.add_callback(ble.EVT_DISCONNECTED,disconnection_cb)
# Create a GATT Service: let's try an Alert Notification Service
# (here are the specs: https://www.bluetooth.com/specifications/gatt/viewer?attributeXmlFile=org.bluetooth.service.alert_notification.xml)
s = ble.Service(0x1811)
# The Alert Notification service has multiple characteristics. Let's add them one by one
# Create a GATT Characteristic for counting new alerts.
# specs: https://www.bluetooth.com/specifications/gatt/viewer?attributeXmlFile=org.bluetooth.characteristic.supported_new_alert_category.xml
cn = ble.Characteristic(0x2A47, ble.NOTIFY | ble.READ,16,"New Alerts",ble.BYTES)
# Add the GATT Characteristic to the Service
s.add_characteristic(cn)
# Create anothr GATT Characteristic for enabling/disabling alerts
# specs: https://www.bluetooth.com/specifications/gatt/viewer?attributeXmlFile=org.bluetooth.characteristic.alert_notification_control_point.xml
cc = ble.Characteristic(0x2A44, ble.WRITE ,2,"Alerts control",ble.BYTES)
# Add the GATT Characteristic to the Service
s.add_characteristic(cc)
# Add a callback to be notified of changes
cc.set_callback(value_cb)
# Add the Service. You can create additional services and add them one by one
ble.add_service(s)
# Configure security. BLE security is very flexible.
# In this case we declare that the device has only an output capability (CAP_DISPLAY_ONLY),
# that we require a bonding (storage of the keys after pairing)
# and that we want both secure connection and main in the middle protection.
# Since we have CAP_DISPLAY_ONLY, we also declare a passkey that will be shown to the user
# to be entered on the master (i.e. the smartphone) to finalize the bonding.
ble.security(
capabilities=ble.CAP_DISPLAY_ONLY,
bonding=ble.AUTH_BOND,
scheme=ble.AUTH_SC|ble.AUTH_MITM,
key_size=16,
passkey=225575)
# To do so, we need a callback to display the passkey when needed
ble.add_callback(ble.EVT_SHOW_PASSKEY,show_key_cb)
# Setup advertising to 50ms
ble.advertising(50)
# Start the BLE stack
ble.start()
# Now start advertising
ble.start_advertising()
except Exception as e:
print(e)
# loop forever
while True:
print(".")
if random(0,100)<50 and notifications_enabled and connected:
value = bytearray(cn.get_value())
value[0]=0 # simple alert type
if value[1]<255:
value[1]=value[1]+1 # add a notification
print("Adding a new notification, total of",value[1])
# the remaining 14 bytes can be some text
value[2:10] = "Zerynth!"
# set the new value. If ble notifications are enabled, the connected device will receive the change
cn.set_value(value)
sleep(5000)
BLE Alerts Secure 2¶
BLE Alerts with Security 2
==========================
An implementationof an Alert Notification device to show how services and characteristics can be easily created. It also features secure connections with bonding using confirmation capabilities of the device.
################################################################################
# BLE Alerts with Security 2
#
# Created by Zerynth Team 2019 CC
# Author: G. Baldi
###############################################################################
import streams
#import the ESP32 BLE driver: a BLE capable VM is also needed!
from espressif.esp32ble import esp32ble as bledrv
# then import the BLE modue
from wireless import ble
streams.serial()
notifications_enabled = True
connected = False
# Let's define some callbacks
def value_cb(status,val):
# check incoming commands and enable/disable notifications
global notifications_enabled
print("Value changed to",val[0],val[1])
if val[0]==0:
print("Notifications enabled")
notifications_enabled = True
elif val[0]==2:
notifications_enabled = False
print("Notifications disabled")
else:
print("Notifications unchanged")
def connection_cb(address):
global connected
print("Connected to",ble.btos(address))
connected = True
def disconnection_cb(address):
global connected
print("Disconnected from",ble.btos(address))
# let's start advertising again
ble.start_advertising()
connected = False
# Let's define some security callbacks
def match_key_cb(passkey):
print("MASTER KEY IS:",passkey,"CAN WE PROCEED? PRESS BUTTON FOR YES")
pinMode(BTN0,INPUT)
for i in range(5):
if digitalRead(BTN0)!=0:
ble.confirm_passkey(1)
print("Confirmed!")
return
sleep(1000)
ble.confirm_passkey(0)
print("Not confirmed!")
try:
# initialize BLE driver
bledrv.init()
# Set GAP name and LEVEL 2 security
# !!! If security is not set, no secure connection will be possible
ble.gap("ZNotifier",security=(ble.SECURITY_MODE_1,ble.SECURITY_LEVEL_2))
# add some GAP callbacks
ble.add_callback(ble.EVT_CONNECTED,connection_cb)
ble.add_callback(ble.EVT_DISCONNECTED,disconnection_cb)
# Create a GATT Service: let's try an Alert Notification Service
# (here are the specs: https://www.bluetooth.com/specifications/gatt/viewer?attributeXmlFile=org.bluetooth.service.alert_notification.xml)
s = ble.Service(0x1811)
# The Alert Notification service has multiple characteristics. Let's add them one by one
# Create a GATT Characteristic for counting new alerts.
# specs: https://www.bluetooth.com/specifications/gatt/viewer?attributeXmlFile=org.bluetooth.characteristic.supported_new_alert_category.xml
cn = ble.Characteristic(0x2A47, ble.NOTIFY | ble.READ,16,"New Alerts",ble.BYTES)
# Add the GATT Characteristic to the Service
s.add_characteristic(cn)
# Create anothr GATT Characteristic for enabling/disabling alerts
# specs: https://www.bluetooth.com/specifications/gatt/viewer?attributeXmlFile=org.bluetooth.characteristic.alert_notification_control_point.xml
cc = ble.Characteristic(0x2A44, ble.WRITE ,2,"Alerts control",ble.BYTES)
# Add the GATT Characteristic to the Service
s.add_characteristic(cc)
# Add a callback to be notified of changes
cc.set_callback(value_cb)
# Add the Service. You can create additional services and add them one by one
ble.add_service(s)
# Configure security. BLE security is very flexible.
# In this case we declare that the device has only an output capability with yes o or no input (CAP_DISPLAY_YES_NO),
# that we require a bonding (storage of the keys after pairing)
# and that we want both secure connection and main in the middle protection.
ble.security(
capabilities=ble.CAP_DISPLAY_YES_NO,
bonding=ble.AUTH_BOND,
scheme=ble.AUTH_SC|ble.AUTH_MITM,
key_size=16)
# To do so, we need a callback to accept the passkey when needed
ble.add_callback(ble.EVT_MATCH_PASSKEY,match_key_cb)
# Setup advertising to 50ms
ble.advertising(50)
# Start the BLE stack
ble.start()
# Now start advertising
ble.start_advertising()
except Exception as e:
print(e)
# Uncomment the following lines to delte bonded devices!
for bond in ble.bonded():
print("Removing bonded:",ble.btos(bond))
ble.remove_bonded(bond)
# loop forever
while True:
print(".")
if random(0,100)<50 and notifications_enabled and connected:
value = bytearray(cn.get_value())
value[0]=0 # simple alert type
if value[1]<255:
value[1]=value[1]+1 # add a notification
print("Adding a new notification, total of",value[1])
# the remaining 14 bytes can be some text
value[2:10] = "Zerynth!"
# set the new value. If ble notifications are enabled, the connected device will receive the change
cn.set_value(value)
sleep(5000)
BLE Wifi¶
BLE Wifi
========
A BLE scanner sending packet data to a tcp socket ovr a Wifi connection.
################################################################################
# BLE Wifi
#
# Created by Zerynth Team 2019 CC
# Author: G. Baldi
###############################################################################
import streams
#import the ESP32 BLE driver: a BLE capable VM is also needed!
from espressif.esp32ble import esp32ble as bledrv
# then import the BLE modue
from wireless import ble
# import wifi modules
from espressif.esp32net import esp32wifi as wifi_driver
from wireless import wifi
import socket
import gc
streams.serial()
# Let's define some callbacks and constants
# How long to scan for in milliseconds
scan_time=30000
# tcp socket
ss=None
def scan_report_cb(data):
try:
print("Detected packet from",ble.btos(data[4]),"containing",ble.btos(data[3]))
print(" packet is of type",data[0],"while address is of type",data[1])
print(" remote device has RSSI of",data[2])
# send to socket
ss.sendall(ble.btos(data[3]))
ss.sendall("\n")
except Exception as e:
print("send",e)
def scan_start_cb(data):
print("Scan started")
def scan_stop_cb(data):
print("Scan stopped")
#let's start it up again
ble.start_scanning(scan_time)
try:
# initialize wifi
wifi_driver.auto_init()
for i in range(10):
try:
print('connecting to wifi...')
# place here your wifi configuration
wifi.link("SSID",wifi.WIFI_WPA2,"password")
break
except Exception as e:
print(e)
# let's open a socket to forward the BLE packets
print("Opening socket...")
ss=socket.socket()
# you can run "nc -l -p 8082" on your machine
# and change the ip below
ss.connect(("192.168.71.52",8082))
print("Starting BLE...")
# initialize BLE driver
bledrv.init()
# Set GAP name and no security
ble.gap("Zerynth",security=(ble.SECURITY_MODE_1,ble.SECURITY_LEVEL_1))
ble.add_callback(ble.EVT_SCAN_REPORT,scan_report_cb)
ble.add_callback(ble.EVT_SCAN_STARTED,scan_start_cb)
ble.add_callback(ble.EVT_SCAN_STOPPED,scan_stop_cb)
#set scanning parameters: every 100ms for 50ms and no duplicates
ble.scanning(100,50,duplicates=0)
# Start the BLE stack
ble.start()
# Now start scanning for 30 seconds
ble.start_scanning(scan_time)
except Exception as e:
print(e)
# loop forever
while True:
print(".",gc.info())
sleep(10000)
ss.sendall("::\n")