Examples¶
- Hello 4ZeroBox
- Sensor Reading
- Modbus Serial Interface
- 4ZeroBox meets ZDM
- 4ZeroBox meets ZDM with GSM
- Firmware Over The Air Update
Hello 4ZeroBox¶
Hello 4ZeroBox
==============
The simplest example to be tried for starting with embedded development
tags: [First Steps, Serial]
groups:[First Steps]
###############################################################################
# Hello 4ZeroBox
#
# Created: 2020-09-10 15:45:48.789211
#
################################################################################
# import the streams module, it is needed to send data around
import streams
import sfw
# Set Watchdog timeout
sfw.watchdog(0, 60000)
# open the default serial port, the output will be visible in the serial console
streams.serial()
# loop forever
while True:
print("Hello Zerynth 4ZeroBox!") # print automatically knows where to print!
sleep(1000)
# Reset Watchdog timer
sfw.kick()
Sensor Reading¶
Sensor Reading
==============
This example shows how to initialize and configurate adc channels of the 4ZeroBox (Analog 4-20mA and 0-10V, Resistive, and current channels).
################################################################################
# 4ZeroBox Sensor Reading
#
# Created: 2020-09-10 15:45:48.789211
#
################################################################################
import streams
import mcu
import json
import threading as th
import sfw
from fourzerobox import fourzerobox
# Lock for sync
core_sample_lock = th.Lock()
ready = True
# Set Watchdog timeout
sfw.watchdog(0, 60000)
# Init sys
streams.serial()
print('init')
# Reference table for no linear NTC sensor
ref_table = [329.5,247.7,188.5,144.1,111.3,86.43,67.77,53.41,42.47,33.90,27.28,22.05,17.96,14.69,12.09,10.00,8.313,
6.940,5.827,4.911,4.160,3.536,3.020,2.588,2.228,1.924,1.668,1.451,1.266,1.108,0.9731,0.8572,0.7576]
# FourZero Var
fzbox = None
try:
# Create FourZerobox Instance
fzbox = fourzerobox.FourZeroBox(i2c_clk=100000)
except Exception as e:
print(e)
mcu.reset()
# Thread function for read sensor data and send them to the cloud
def read_event_handler():
global ready
while True:
try:
# Sync
ready = True
core_sample_lock.acquire()
ready = False
print("======== reading")
# Read from 4-20mA channel1, resistive channel1, power channel1
analog_val = fzbox.read_420(1)
temperature = fzbox.read_resistive(1)
power = fzbox.read_power(1)
print(" - analog:", analog_val)
print(" - temp:", temperature)
print(" - power:", power)
print("======== done")
# reverse green blink each cycle
fzbox.reverse_pulse('G',100)
except Exception as e:
print("Generic Error:", e)
fzbox.error_cloud()
mcu.reset()
print("core init...")
core_sample_lock.acquire()
try:
print("adc config...")
# Config FourZeroBox ADC channels
fzbox.config_adc_010_420(1, 3, 0)
fzbox.config_adc_resistive(1, 2, 0)
fzbox.config_adc_current(1, 2, 7)
# Config FourZeroBox ADC conversion parameters
fzbox.set_conversion_010_420(1, 0, 100, 0, 0, 100)
fzbox.set_conversion_resistive(1, -50, ref_table, 5, 0, None)
fzbox.set_conversion_current(1, 100, 5, 2000, 220, 0)
print("adc config done")
except Exception as e:
print(e)
mcu.reset()
try:
print("Start read_event_handler thread")
thread(read_event_handler)
print('start main')
# Main Loop
while True:
sleep(10000)
# Sync between main thread and pub_event_handler thread
if ready:
core_sample_lock.release()
# Reset Watchdog timer
sfw.kick()
except Exception as e:
print (e)
fzbox.pulse('R', 10000)
mcu.reset()
Modbus Serial Interface¶
Modbus Serial Interface
=======================
This example shows how to initialize and establish a serial modbus communication
###############################################################################
# Modbus Serial Interface
#
# Created: 2020-09-10 15:45:48.789211
#
################################################################################
from modbus import modbus
import streams
from fourzerobox import fourzerobox
import sfw
streams.serial()
# Set Watchdog timeout
sfw.watchdog(0, 60000)
def Write_One_Register(master, register, values=None, num=None):
if (values == None and num == None):
raise ValueError
if (values != None):
num = 0
for i in range(len(values)):
num += values[i] << i
if (num > 0xffff):
raise ValueError
result = master.write_register(register, num)
print("Value", result, "successfully written in the register", register)
return result
def Read_One_Register(master, register):
num = master.read_holding(register, 1)[0]
out = []
for i in range(10):
out.append(num>>i & 1)
return out
try:
fzbox = fourzerobox.FourZeroBox(i2c_clk=100000)
print("fzbox init")
config_serial = modbus.ConfigSerial(SERIAL1, 9600, rs485en=fourzerobox.RS485EN)
print("serial configurated")
# change the identifier (slave address) if needed
master_in = modbus.ModbusSerial(1, cfg = config_serial)
print("start exchange messages")
# write list of bits on register with address 2 (change it if needed)
try:
result = Write_One_Register(master_in, 2, values=[1, 0, 1, 0, 0, 0, 0, 0, 0, 0]) # max number of values is 16 elements -> register 16 bit
# read register 2 and check the result
result = Read_One_Register(master_in, 2)
print("Get holding register 2: ", result)
except Exception as e:
print(e)
try:
# write single num value on register with address 3 (change it if needed)
result = Write_One_Register(master_in, 3, num=3)
# read register 3 and check the result
result = Read_One_Register(master_in, 3)
print("Get holding register 3: ", result)
except Exception as e:
print(e)
sfw.kick()
master_in.close()
except Exception as e:
print("Exception ", e)
master_in.close()
4ZeroBox meets ZDM¶
4ZeroBox meets ZDM
==================
Basic example to connect 4ZeroBox to the Zerynth Device Manager sending data to the cloud service
###############################################################################
# 4ZeroBox meets ZDM
#
# Created: 2020-09-10 15:45:48.789211
#
###############################################################################
from bsp.drivers import wifi
import streams
from zdm import zdm
import threading as th
import sfw
import mcu
from fourzerobox import fourzerobox
# Lock for sync
core_sample_lock = th.Lock()
ready = True
# Set Watchdog timeout
sfw.watchdog(0, 60000)
# Init sys
streams.serial()
print('init')
# Reference table for no linear NTC sensor
ref_table = [329.5,247.7,188.5,144.1,111.3,86.43,67.77,53.41,42.47,33.90,27.28,22.05,17.96,14.69,12.09,10.00,8.313,
6.940,5.827,4.911,4.160,3.536,3.020,2.588,2.228,1.924,1.668,1.451,1.266,1.108,0.9731,0.8572,0.7576]
# FourZero Var
fzbox = None
try:
# Create FourZerobox Instance
fzbox = fourzerobox.FourZeroBox(i2c_clk=100000)
except Exception as e:
print(e)
mcu.reset()
def pub_event_handler():
global ready
while True:
try:
# Sync
ready = True
core_sample_lock.acquire()
ready = False
print("======== reading")
# Read from 4-20mA channel1, resistive channel1
analog_val = fzbox.read_420(1)
temperature = fzbox.read_resistive(1)
print(" - temp:", temperature)
print(" - analog:", analog_val)
# Organize data in json dict
to_send = {}
to_send['temp'] = temperature
to_send['analog'] = analog_val
print("======== done")
# Publish data to ZDM cloud service
device.publish(to_send, "data")
sleep(100)
rssi = fzbox.get_rssi()
# yellow blink if low signal, green blink otherwise
if rssi < -70:
fzbox.reverse_pulse('Y',100)
else:
fzbox.reverse_pulse('G',100)
except Exception as e:
print('Publish exception: ', e)
fzbox.error_cloud()
mcu.reset()
try:
# connection to wifi network
fzbox.net_init()
print("Connecting to wifi ...")
fzbox.net_connect("SSID","PWD")
print("... done")
print("Connecting to ZDM ...")
device = zdm.Device()
# connect the device to the ZDM
device.connect()
print("... done")
except Exception as e:
print (e)
fzbox.pulse('R', 10000)
mcu.reset()
core_sample_lock.acquire()
try:
print("adc config...")
# Config FourZeroBox ADC channels
fzbox.config_adc_010_420(1, 3, 0)
fzbox.config_adc_resistive(1, 2, 0)
# Config FourZeroBox ADC conversion parameters
fzbox.set_conversion_010_420(1, 0, 100, 0, 0, 100)
fzbox.set_conversion_resistive(1, -50, ref_table, 5, 0, None)
print("adc config done")
except Exception as e:
print(e)
mcu.reset()
try:
print("core init done")
# Start read_event_handler thread
thread(pub_event_handler)
print('start main')
# Main Loop
while True:
sleep(10000)
# Sync between main thread and pub_event_handler thread
if ready:
core_sample_lock.release()
# Reset Watchdog timer
sfw.kick()
except Exception as e:
print (e)
fzbox.pulse('R', 10000)
mcu.reset()
4ZeroBox meets ZDM with GSM¶
4ZeroBox meets ZDM with GSM
===========================
Basic example to connect 4ZeroBox to the Zerynth Device Manager, with GSM connectivity, sending data to the cloud service
###############################################################################
# 4ZeroBox meets ZDM with GSM
#
# Created: 2020-09-10 15:45:48.789211
#
###############################################################################
from bsp.drivers import gsm
import streams
from zdm import zdm
import threading as th
import sfw
import mcu
from fourzerobox import fourzerobox
import requests
import json
# Lock for sync
core_sample_lock = th.Lock()
ready = True
# Set Watchdog timeout
sfw.watchdog(0, 60000)
# Init sys
streams.serial()
print('init')
# Reference table for no linear NTC sensor
ref_table = [329.5,247.7,188.5,144.1,111.3,86.43,67.77,53.41,42.47,33.90,27.28,22.05,17.96,14.69,12.09,10.00,8.313,
6.940,5.827,4.911,4.160,3.536,3.020,2.588,2.228,1.924,1.668,1.451,1.266,1.108,0.9731,0.8572,0.7576]
# FourZero Var
fzbox = None
device = None
try:
# Create FourZerobox Instance
fzbox = fourzerobox.FourZeroBox(i2c_clk=100000)
except Exception as e:
print(e)
mcu.reset()
def pub_event_handler():
global ready
while True:
try:
# Sync
ready = True
core_sample_lock.acquire()
ready = False
print("======== reading")
# Read from 4-20mA channel1, resistive channel1
analog_val = fzbox.read_420(1)
temperature = fzbox.read_resistive(1)
print(" - temp:", temperature)
print(" - analog:", analog_val)
# Organize data in json dict
to_send = {}
to_send['temp'] = temperature
to_send['analog'] = analog_val
print("======== done")
# Publish data to ZDM cloud service
device.publish(to_send, "data")
sleep(100)
rssi = fzbox.get_rssi()
if rssi < -70:
fzbox.reverse_pulse('Y',100)
else:
fzbox.reverse_pulse('G',100)
except Exception as e:
print('Publish exception: ', e)
fzbox.error_cloud()
mcu.reset()
def retrieve_ts():
if device and device.connected():
device.mqtt.disconnect
sleep(1000)
print(fzbox.net.rtc())
res = requests.get("http://now.zerynth.com/")
js = json.loads(res.content)
ts = int(js["now"]["epoch"])
return ts
try:
fzbox.net_init()
print("attaching ...")
fzbox.net_connect("apn")
print("... done")
print("Connecting to ZDM ...")
device = zdm.Device(time_function=retrieve_ts)
# connect the device to the ZDM
device.connect()
print("... done")
except Exception as e:
print (e)
fzbox.pulse('R', 10000)
core_sample_lock.acquire()
try:
print("adc config...")
# Config FourZeroBox ADC channels
fzbox.config_adc_010_420(1, 3, 0)
fzbox.config_adc_resistive(1, 2, 0)
# Config FourZeroBox ADC conversion parameters
fzbox.set_conversion_010_420(1, 0, 100, 0, 0, 100)
fzbox.set_conversion_resistive(1, -50, ref_table, 5, 0, None)
print("adc config done")
except Exception as e:
print(e)
mcu.reset()
try:
print("core init done")
# Start read_event_handler thread
thread(pub_event_handler)
print('start main')
# Main Loop
while True:
sleep(10000)
# Sync between main thread and pub_event_handler thread
if ready:
core_sample_lock.release()
# Reset Watchdog timer
sfw.kick()
except Exception as e:
print (e)
fzbox.pulse('R', 10000)
mcu.reset()
Firmware Over The Air Update¶
Firmware Over The Air Update
============================
Connect your 4ZeroBox to ZDM and start updating the firmware seamlessly.
In this example, a FOTA callback function is defined, which is called during the FOTA update steps.
The FOTA callback allows you to accept or refuse a FOTA from your devices using the return value.
If the callback returns True the device will accept the FOTA update requests, if the callback return False
the device will refuse it.
Try to edit the function e do your tests using ZDM FOTA commands.
###############################################################################
# Firmware Over The Air Update
#
# Created: 2020-09-10 15:45:48.789211
#
###############################################################################
from bsp.drivers import wifi
import streams
from zdm import zdm
import threading as th
import sfw
import mcu
from fourzerobox import fourzerobox
# Lock for sync
core_sample_lock = th.Lock()
ready = True
# Set Watchdog timeout
sfw.watchdog(0, 60000)
# Init sys
streams.serial()
print('init')
# firmware version to change and check after the FOTA procedure
fw_version = "v01"
# Reference table for no linear NTC sensor
ref_table = [329.5,247.7,188.5,144.1,111.3,86.43,67.77,53.41,42.47,33.90,27.28,22.05,17.96,14.69,12.09,10.00,8.313,
6.940,5.827,4.911,4.160,3.536,3.020,2.588,2.228,1.924,1.668,1.451,1.266,1.108,0.9731,0.8572,0.7576]
# FourZero Var
fzbox = None
try:
# Create FourZerobox Instance
fzbox = fourzerobox.FourZeroBox(i2c_clk=100000)
except Exception as e:
print(e)
mcu.reset()
def pub_event_handler():
global ready
while True:
try:
# Sync
ready = True
core_sample_lock.acquire()
ready = False
print("======== reading")
# Read from 4-20mA channel1, resistive channel1, power channel1
analog_val = fzbox.read_420(1)
temperature = fzbox.read_resistive(1)
print(" - temp:", temperature)
print(" - analog:", analog_val)
# Organize data in json dict
to_send = {}
to_send['temp'] = temperature
to_send['analog'] = analog_val
print("======== done")
# Publish data to 4ZeroManager cloud service
device.publish(to_send, "data")
sleep(100)
rssi = fzbox.get_rssi()
if rssi < -70:
fzbox.reverse_pulse('Y',100)
else:
fzbox.reverse_pulse('G',100)
except Exception as e:
print('Publish exception: ', e)
fzbox.error_cloud()
mcu.reset()
# remote color blink
def jblink(device, arg):
if "led" in arg:
if arg["led"] in ['R','G','B','M','Y','C','W']:
fzbox.reverse_pulse(arg["led"],100)
return {"res": "OK"}
else:
return {"error": "bad args"}
# remote async read sensor
def jread(device, arg):
if "var" in arg:
if arg["var"] == "analog":
val = fzbox.read_420(1)
elif arg["var"] == "temp":
val = fzbox.read_resistive(1)
else:
val = "error"
return {arg["var"]: val}
# custom jobs
my_jobs = {
'blink': jblink,
'read_async': jread,
}
try:
fzbox.net_init()
print("Connecting to wifi ...")
fzbox.net_connect("SSID","PWD")
print("... done")
print("Connecting to ZDM ...")
device = zdm.Device(jobs_dict=my_jobs)
# connect the device to the ZDM
device.connect()
print("... done")
except Exception as e:
print (e)
fzbox.pulse('R', 10000)
mcu.reset()
core_sample_lock.acquire()
try:
print("adc config...")
# Config FourZeroBox ADC channels
fzbox.config_adc_010_420(1, 3, 0)
fzbox.config_adc_resistive(1, 2, 0)
# Config FourZeroBox ADC conversion parameters
fzbox.set_conversion_010_420(1, 0, 100, 0, 0, 100)
fzbox.set_conversion_resistive(1, -50, ref_table, 5, 0, None)
print("adc config done")
except Exception as e:
print(e)
mcu.reset()
try:
print("core init done")
# Start read_event_handler thread
thread(pub_event_handler)
print('start main')
# Main Loop
while True:
print("Hello! firmware running version", fw_version)
sleep(10000)
# Sync between main thread and pub_event_handler thread
if ready:
core_sample_lock.release()
# Reset Watchdog timer
sfw.kick()
except Exception as e:
print (e)
fzbox.pulse('R', 10000)
mcu.reset()