土壌センサのデータ取得 – サンプルスクリプト[soil_sensor_mgr02.py]
サンプルスクリプト [soil_sensor_mgr02.py]
"""
murata Soil Sensor SL5006 and RATOC Systems RS61G or USB60FP
Evaluation Program V 1R0 05/Feb./2023 by C.Okamura RATOC Systems, Inc.
2R0 23/Feb./2023 Added Excel file access code
Copyright 2023 RATOC Systems, Inc. All right reserved.
running on Windows11 / Phython 3.11.1
"""
# -*- coding: utf-8 -*-
import serial
import time
import datetime
import openpyxl
# COM="COM4" #USB60F
COM="COM5" #SG61
bitRate=9600
# Open Serial port
ser = serial.Serial(COM, bitRate, timeout=10 )
wrtcmd = 2 # command for write data to register
readcmd = 1 # command for read data from register
SNSR_CTR_reg = 7 # Sensor Control register
SNSR_STS_reg = 8 # Sensor Status register
FW_ver_reg = 0 # F/W version register 3 bytes
SER_NUM_reg = 3 # Serial Number register 4 bytes
TEMP_register = 19 # Temparature data register 2 buytes 0x13
EC_BULK_reg = 21 # EC BULK data register 2 bytes 0x15
VWC_reg = 25 # VWC data register 2 bytes 0X19
EC_PORE_reg = 31 # EC_PORE data register 2 bytes 0x1f
# CRC16 ModBus caluculation
# Modbus-RTU polynomial 0x1021 X16 + X15 + X2 + X0 11000 0000 0000 0101 --> reverse bit order --> 0xa001
def crc16_Modbus(data : bytearray, offset , length):
if data is None or offset < 0 or offset > len(data)- 1 and offset+length > len(data):
return 0
crc = 0xFFFF
for i in range(0, length):
crc ^= data[offset + i]
for j in range(0,8):
if (crc & 0x0001) == 1: # check LSB
crc =(crc >> 1) ^ 0xa001
else:
crc = crc >> 1
return crc.to_bytes(2, 'big')
# Receive message and data from SLT5006 Soil Sensor
def receive_response():
rcv_hdr = ser.read( 3 ) # get header
f_code = rcv_hdr[0] # get response code
if f_code >= 0x80:
er_code = rcv_hdr[1] # pick up ERROR CODE
er_crc = ser.read( 1 ) # get CRCL
rcv_msg = rcv_hdr + er_crc
else:
bcnt = rcv_hdr[2] # get data byte count
rcv_data = ser.read( bcnt + 2 )
er_code = 0
rcv_msg = rcv_hdr + rcv_data
return rcv_msg
# Send Command to SLT5006
def send_cmd( tx_cmd ):
ser.write( tx_cmd )
return
# Send Start command for SL5006
def Tx_start_cmd( ):
f_code = 2 # wrtcmd
register = 7 # SNSR_CTR_reg
B_size = 1
data_01 = 1
data = bytes([ f_code, register, B_size, data_01 ]) # b"\x02\x07\x01\x01"
offset = 0
length = 4
# Calculate CRC16_Modbus
crc16mb = crc16_Modbus(data, offset, length)
CRC16U = crc16mb[0]
CRC16L = crc16mb[1]
tx_cmd = bytes([ f_code, register, B_size, data_01, CRC16U, CRC16L ])
# Send Command and receive response
send_cmd( tx_cmd )
rcv_resp = receive_response()
# Check message from SLT5006
msg_length = len( rcv_resp)
if msg_length == 4 :
print( "Error Code :", rcv_resp[1] )
print( rcv_resp)
rtn_code0 = -1
else:
rtn_code0 = 1
#print( rcv_resp )
#print(" Samling Start")
return rtn_code0
# Check Sensor Status. If Data_Ready then Retuen else continue to check the status.
def Setup_status_check_cmd( ):
f_code = 1 #readcmd
register = 8 #SNSR_STS_reg
B_size = 1
data = bytes([ f_code, register, B_size ]) # b"\x01\x08\x01"
offset = 0
length = 3
# Calculate CRC16_Modbus
crc16mb = crc16_Modbus(data, offset, length)
CRC16U = crc16mb[0]
CRC16L = crc16mb[1]
tx_cmd = bytes([ f_code, register, B_size, CRC16U, CRC16L ])
return tx_cmd
def Check_sensor_status( schk_cmd ):
send_cmd( schk_cmd )
rcv_resp = receive_response()
msg_length = len( rcv_resp)
if msg_length == 4 :
print( "Error Code :", rcv_resp[1] )
print( rcv_resp)
rtn_code0 = -1
if rcv_resp[3] == 0x01:
rtn_code0 = 1
print( "Data Ready")
#print( rcv_resp )
if rcv_resp[3] == 0:
rtn_code0 = 0
print( "Data not Ready")
#print( rcv_resp )
return rtn_code0
# Get data from Soil Sensor.
def Get_sensor_data( ):
# print("Get Data")
f_code = 1 #readcmd
register = 19 # 0x13TEMP_register
B_size = 16 # data msg length = 16 bytes
data = bytes([ f_code, register, B_size ]) # b"\x01\x08\x01"
offset = 0
length = 3
# Calculate CRC16_Modbus
crc16mb = crc16_Modbus(data, offset, length)
CRC16U = crc16mb[0]
CRC16L = crc16mb[1]
tx_cmd = bytes([ f_code, register, B_size, CRC16U, CRC16L ])
send_cmd( tx_cmd )
# wait response from Soil Sensor
rcv_resp = receive_response()
msg_length = len( rcv_resp)
# print( "Rcv Length =", msg_length)
if msg_length == 4 :
print( "Error Code :", rcv_resp[1] )
print( rcv_resp)
rtn_code0 = -1
else: #if msg_length == 18:
rtn_code0 = 1
#print( "Recive Data :", rcv_resp )
#
temp = (( rcv_resp[4] << 8 ) | ( rcv_resp[3] & 0x00ff ))
if temp >= 0x800:
temp_dt = ((0x1000 - temp ) * -1 ) * 0.0625
else:
temp_dt = temp * 0.0625
print( "Temparature :", temp_dt )
#
ec_bulk = (( rcv_resp[6] << 8 ) | ( rcv_resp[5] & 0x00ff )) * 0.001
print( "EC_BULK :", ec_bulk )
vwc = (( rcv_resp[10] << 8 ) | ( rcv_resp[9] & 0x00ff )) * 0.1
print( "VWC :", vwc )
ec_pore = (( rcv_resp[16] << 8 ) | ( rcv_resp[15] & 0x00ff )) * 0.001
print( "EC_PORE :", ec_pore )
comment = "new data"
# set data to Excel file
# Open Excel file and select Shhet
wb = openpyxl.load_workbook('sl5006_data_01.xlsx')
sheet = wb['Sheet1'] # select default Sheet name.
# Pick up current bottom row number and get target row number
nextrow = ( sheet['G3'].value ) + 1
print("Next row :", nextrow )
today = datetime.date.today()
t_now = datetime.datetime.now().time()
sheet.cell( row = nextrow, column = 1 ).value = today
sheet.cell( row = nextrow, column = 2 ).value = t_now
sheet.cell( row = nextrow, column = 3 ).value = temp_dt
sheet.cell( row = nextrow, column = 4 ).value = ec_bulk
sheet.cell( row = nextrow, column = 5 ).value = vwc
sheet.cell( row = nextrow, column = 6 ).value = ec_pore
sheet.cell( row = nextrow, column = 7 ).value = comment
if nextrow >= 87: # 3data/day for 31days
nextrow = 5 # set Topline
sheet['G3'].value = nextrow
# if set data into Topline then set F/W version and serial no.
if nextrow == 6:
fw_rev = Get_fwrev()
ser_no = Get_sno()
sheet['B2'].value = fw_rev
sheet['B3'].value = ser_no
# Close xlsx
wb.save('sl5006_data_01.xlsx')
wb.close()
return rtn_code0
# Get Farmware version and Serial numberr.
def Get_fwrev( ):
# print("Get F/W Rev. and Serial no.")
f_code = 1 # readcmd
register = 0 # FW_ver_reg
B_size = 3 # F/W ver 3bytes
data = bytes([ f_code, register, B_size ]) # b"\x01\x00\x07"
offset = 0
length = 3
# Calculate CRC16_Modbus
crc16mb = crc16_Modbus(data, offset, length)
CRC16U = crc16mb[0]
CRC16L = crc16mb[1]
tx_cmd = bytes([ f_code, register, B_size, CRC16U, CRC16L ])
send_cmd( tx_cmd )
# wait response from Soil Sensor
rcv_resp = receive_response()
msg_length = len( rcv_resp)
# print( "Rcv Length =", msg_length)
if msg_length == 4 :
print( "Error Code :", rcv_resp[1] )
print( rcv_resp)
rtn_code0 = -1
else: #if msg_length == 18:
rtn_code0 = 1
#print( "Recive Data :", rcv_resp )
#
fw_ver_mjr = rcv_resp[3]
fw_ver_min = rcv_resp[4]
fw_rev = rcv_resp[5]
fw_ver = str(fw_ver_mjr) + "." + str(fw_ver_min) + "." + str( fw_rev)
#print( "F/W version :",fw_ver)
return fw_ver
# Get Serial numberr.
def Get_sno( ):
# print("Get F/W Rev. and Serial no.")
f_code = 1 # readcmd
register = 3 # SER_No_reg
B_size = 4 # Serial no. 4bytes
data = bytes([ f_code, register, B_size ]) # b"\x01\x00\x07"
offset = 0
length = 3
# Calculate CRC16_Modbus
crc16mb = crc16_Modbus(data, offset, length)
CRC16U = crc16mb[0]
CRC16L = crc16mb[1]
tx_cmd = bytes([ f_code, register, B_size, CRC16U, CRC16L ])
send_cmd( tx_cmd )
# wait response from Soil Sensor
rcv_resp = receive_response()
msg_length = len( rcv_resp)
# print( "Rcv Length =", msg_length)
if msg_length == 4 :
print( "Error Code :", rcv_resp[1] )
print( rcv_resp)
rtn_code0 = -1
else: #if msg_length == 18:
rtn_code0 = 1
#print( "Recive Data :", rcv_resp )
#
ser_no = ((rcv_resp[6] << 24 ) | ( rcv_resp[5] << 16 ) | ( rcv_resp[4] << 8 ) | ( rcv_resp[3]))
#print( "Serial No.:", ser_no)
return ser_no
# start here
print( "Soil Data mesuring program V. 1R0")
fw_rev = Get_fwrev()
print( "F/W version :",fw_rev)
ser_no = Get_sno()
print("Serial No. :", ser_no)
print( "Sampling Start and wait Ready status")
rtn_cd = Tx_start_cmd()
if rtn_cd == 1:
print("Samling start successfully ")
schk_cmd = Setup_status_check_cmd( ) #Set up Status Check command
wait_sts = 1
while wait_sts == 1 :
rtn_cd = Check_sensor_status( schk_cmd ) #send command
if rtn_cd == 0:
wait_sts = 1
else:
wait_sts = 0
time.sleep( 2 ) #Wait 2sec.
print("Get Data command")
rtn_cd = Get_sensor_data()
print("End")
ser.close()