土壌センサのデータ取得 – サンプルスクリプト[soil_sensor_mgr02.py]
サンプルスクリプト [soil_sensor_mgr02.py]
""" murata Soil Sensor SL5006 and RATOC Systems RS61G or USB60FP Evaluation Program V 1R0 05/Feb./2023 by C.Okamura RATOC Systems, Inc. 2R0 23/Feb./2023 Added Excel file access code Copyright 2023 RATOC Systems, Inc. All right reserved. running on Windows11 / Phython 3.11.1 """ # -*- coding: utf-8 -*- import serial import time import datetime import openpyxl # COM="COM4" #USB60F COM="COM5" #SG61 bitRate=9600 # Open Serial port ser = serial.Serial(COM, bitRate, timeout=10 ) wrtcmd = 2 # command for write data to register readcmd = 1 # command for read data from register SNSR_CTR_reg = 7 # Sensor Control register SNSR_STS_reg = 8 # Sensor Status register FW_ver_reg = 0 # F/W version register 3 bytes SER_NUM_reg = 3 # Serial Number register 4 bytes TEMP_register = 19 # Temparature data register 2 buytes 0x13 EC_BULK_reg = 21 # EC BULK data register 2 bytes 0x15 VWC_reg = 25 # VWC data register 2 bytes 0X19 EC_PORE_reg = 31 # EC_PORE data register 2 bytes 0x1f # CRC16 ModBus caluculation # Modbus-RTU polynomial 0x1021 X16 + X15 + X2 + X0 11000 0000 0000 0101 --> reverse bit order --> 0xa001 def crc16_Modbus(data : bytearray, offset , length): if data is None or offset < 0 or offset > len(data)- 1 and offset+length > len(data): return 0 crc = 0xFFFF for i in range(0, length): crc ^= data[offset + i] for j in range(0,8): if (crc & 0x0001) == 1: # check LSB crc =(crc >> 1) ^ 0xa001 else: crc = crc >> 1 return crc.to_bytes(2, 'big') # Receive message and data from SLT5006 Soil Sensor def receive_response(): rcv_hdr = ser.read( 3 ) # get header f_code = rcv_hdr[0] # get response code if f_code >= 0x80: er_code = rcv_hdr[1] # pick up ERROR CODE er_crc = ser.read( 1 ) # get CRCL rcv_msg = rcv_hdr + er_crc else: bcnt = rcv_hdr[2] # get data byte count rcv_data = ser.read( bcnt + 2 ) er_code = 0 rcv_msg = rcv_hdr + rcv_data return rcv_msg # Send Command to SLT5006 def send_cmd( tx_cmd ): ser.write( tx_cmd ) return # Send Start command for SL5006 def Tx_start_cmd( ): f_code = 2 # wrtcmd register = 7 # SNSR_CTR_reg B_size = 1 data_01 = 1 data = bytes([ f_code, register, B_size, data_01 ]) # b"\x02\x07\x01\x01" offset = 0 length = 4 # Calculate CRC16_Modbus crc16mb = crc16_Modbus(data, offset, length) CRC16U = crc16mb[0] CRC16L = crc16mb[1] tx_cmd = bytes([ f_code, register, B_size, data_01, CRC16U, CRC16L ]) # Send Command and receive response send_cmd( tx_cmd ) rcv_resp = receive_response() # Check message from SLT5006 msg_length = len( rcv_resp) if msg_length == 4 : print( "Error Code :", rcv_resp[1] ) print( rcv_resp) rtn_code0 = -1 else: rtn_code0 = 1 #print( rcv_resp ) #print(" Samling Start") return rtn_code0 # Check Sensor Status. If Data_Ready then Retuen else continue to check the status. def Setup_status_check_cmd( ): f_code = 1 #readcmd register = 8 #SNSR_STS_reg B_size = 1 data = bytes([ f_code, register, B_size ]) # b"\x01\x08\x01" offset = 0 length = 3 # Calculate CRC16_Modbus crc16mb = crc16_Modbus(data, offset, length) CRC16U = crc16mb[0] CRC16L = crc16mb[1] tx_cmd = bytes([ f_code, register, B_size, CRC16U, CRC16L ]) return tx_cmd def Check_sensor_status( schk_cmd ): send_cmd( schk_cmd ) rcv_resp = receive_response() msg_length = len( rcv_resp) if msg_length == 4 : print( "Error Code :", rcv_resp[1] ) print( rcv_resp) rtn_code0 = -1 if rcv_resp[3] == 0x01: rtn_code0 = 1 print( "Data Ready") #print( rcv_resp ) if rcv_resp[3] == 0: rtn_code0 = 0 print( "Data not Ready") #print( rcv_resp ) return rtn_code0 # Get data from Soil Sensor. def Get_sensor_data( ): # print("Get Data") f_code = 1 #readcmd register = 19 # 0x13TEMP_register B_size = 16 # data msg length = 16 bytes data = bytes([ f_code, register, B_size ]) # b"\x01\x08\x01" offset = 0 length = 3 # Calculate CRC16_Modbus crc16mb = crc16_Modbus(data, offset, length) CRC16U = crc16mb[0] CRC16L = crc16mb[1] tx_cmd = bytes([ f_code, register, B_size, CRC16U, CRC16L ]) send_cmd( tx_cmd ) # wait response from Soil Sensor rcv_resp = receive_response() msg_length = len( rcv_resp) # print( "Rcv Length =", msg_length) if msg_length == 4 : print( "Error Code :", rcv_resp[1] ) print( rcv_resp) rtn_code0 = -1 else: #if msg_length == 18: rtn_code0 = 1 #print( "Recive Data :", rcv_resp ) # temp = (( rcv_resp[4] << 8 ) | ( rcv_resp[3] & 0x00ff )) if temp >= 0x800: temp_dt = ((0x1000 - temp ) * -1 ) * 0.0625 else: temp_dt = temp * 0.0625 print( "Temparature :", temp_dt ) # ec_bulk = (( rcv_resp[6] << 8 ) | ( rcv_resp[5] & 0x00ff )) * 0.001 print( "EC_BULK :", ec_bulk ) vwc = (( rcv_resp[10] << 8 ) | ( rcv_resp[9] & 0x00ff )) * 0.1 print( "VWC :", vwc ) ec_pore = (( rcv_resp[16] << 8 ) | ( rcv_resp[15] & 0x00ff )) * 0.001 print( "EC_PORE :", ec_pore ) comment = "new data" # set data to Excel file # Open Excel file and select Shhet wb = openpyxl.load_workbook('sl5006_data_01.xlsx') sheet = wb['Sheet1'] # select default Sheet name. # Pick up current bottom row number and get target row number nextrow = ( sheet['G3'].value ) + 1 print("Next row :", nextrow ) today = datetime.date.today() t_now = datetime.datetime.now().time() sheet.cell( row = nextrow, column = 1 ).value = today sheet.cell( row = nextrow, column = 2 ).value = t_now sheet.cell( row = nextrow, column = 3 ).value = temp_dt sheet.cell( row = nextrow, column = 4 ).value = ec_bulk sheet.cell( row = nextrow, column = 5 ).value = vwc sheet.cell( row = nextrow, column = 6 ).value = ec_pore sheet.cell( row = nextrow, column = 7 ).value = comment if nextrow >= 87: # 3data/day for 31days nextrow = 5 # set Topline sheet['G3'].value = nextrow # if set data into Topline then set F/W version and serial no. if nextrow == 6: fw_rev = Get_fwrev() ser_no = Get_sno() sheet['B2'].value = fw_rev sheet['B3'].value = ser_no # Close xlsx wb.save('sl5006_data_01.xlsx') wb.close() return rtn_code0 # Get Farmware version and Serial numberr. def Get_fwrev( ): # print("Get F/W Rev. and Serial no.") f_code = 1 # readcmd register = 0 # FW_ver_reg B_size = 3 # F/W ver 3bytes data = bytes([ f_code, register, B_size ]) # b"\x01\x00\x07" offset = 0 length = 3 # Calculate CRC16_Modbus crc16mb = crc16_Modbus(data, offset, length) CRC16U = crc16mb[0] CRC16L = crc16mb[1] tx_cmd = bytes([ f_code, register, B_size, CRC16U, CRC16L ]) send_cmd( tx_cmd ) # wait response from Soil Sensor rcv_resp = receive_response() msg_length = len( rcv_resp) # print( "Rcv Length =", msg_length) if msg_length == 4 : print( "Error Code :", rcv_resp[1] ) print( rcv_resp) rtn_code0 = -1 else: #if msg_length == 18: rtn_code0 = 1 #print( "Recive Data :", rcv_resp ) # fw_ver_mjr = rcv_resp[3] fw_ver_min = rcv_resp[4] fw_rev = rcv_resp[5] fw_ver = str(fw_ver_mjr) + "." + str(fw_ver_min) + "." + str( fw_rev) #print( "F/W version :",fw_ver) return fw_ver # Get Serial numberr. def Get_sno( ): # print("Get F/W Rev. and Serial no.") f_code = 1 # readcmd register = 3 # SER_No_reg B_size = 4 # Serial no. 4bytes data = bytes([ f_code, register, B_size ]) # b"\x01\x00\x07" offset = 0 length = 3 # Calculate CRC16_Modbus crc16mb = crc16_Modbus(data, offset, length) CRC16U = crc16mb[0] CRC16L = crc16mb[1] tx_cmd = bytes([ f_code, register, B_size, CRC16U, CRC16L ]) send_cmd( tx_cmd ) # wait response from Soil Sensor rcv_resp = receive_response() msg_length = len( rcv_resp) # print( "Rcv Length =", msg_length) if msg_length == 4 : print( "Error Code :", rcv_resp[1] ) print( rcv_resp) rtn_code0 = -1 else: #if msg_length == 18: rtn_code0 = 1 #print( "Recive Data :", rcv_resp ) # ser_no = ((rcv_resp[6] << 24 ) | ( rcv_resp[5] << 16 ) | ( rcv_resp[4] << 8 ) | ( rcv_resp[3])) #print( "Serial No.:", ser_no) return ser_no # start here print( "Soil Data mesuring program V. 1R0") fw_rev = Get_fwrev() print( "F/W version :",fw_rev) ser_no = Get_sno() print("Serial No. :", ser_no) print( "Sampling Start and wait Ready status") rtn_cd = Tx_start_cmd() if rtn_cd == 1: print("Samling start successfully ") schk_cmd = Setup_status_check_cmd( ) #Set up Status Check command wait_sts = 1 while wait_sts == 1 : rtn_cd = Check_sensor_status( schk_cmd ) #send command if rtn_cd == 0: wait_sts = 1 else: wait_sts = 0 time.sleep( 2 ) #Wait 2sec. print("Get Data command") rtn_cd = Get_sensor_data() print("End") ser.close()