スマートメータからWi-SUN Bルートで電力量取得 – サンプルスクリプト[WSUHA_broute_demo_01.py]
サンプルスクリプト [WSUHA_broute_demo_01.py]
#
# RS-WSUHA-P function demo program Ver. 6R0
# WSUHA_broute_demo=01.py
#
# Get Electric Power data from Snart Neter through WiSUN B route
#
# 02/June/2023 Copyright RATOC Systems, Inc. Osaka Japan
#
# Important Notice : Use WOPT command to receive EUDX DATA in HEX format befor execute this program.
#
import serial
# import time
# from tkinter import Tk, Label
from tkinter import Label # Tk
import tkinter.ttk as ttk
import tkinter as tk
import time
# Econet Lite DATA frame
ehd1 = 0x10 # Econet Lite
ehd2 = 0x81 # EDATA format 1
tidh = 0x52 # Transaction ID H "R"
tidl = 0x53 # Transaction ID L "S"
Seoj_x1 = 0x05 # Source EOJ Class Group Code 管理操作機器グループ
Seoj_x2 = 0xFF # Source EOJ Class Code コントローラクラス
Seoj_x3 = 0x01 # Source EOJ Instance Code
Deoj_x1 = 0x02 # Destination EOJ Class Group Code 住宅関連機器グループ
Deoj_x2 = 0x88 # Destination EOJ Class Code 低圧スマート電力量メータクラス
Deoj_x3 = 0x01 # Destination EOJ Instance Code
esv_req = 0x62 # Econet Lite Service Code Request to read out property value and send back with esv(0x72)
opc_req = 0x01 # Number of property counter be red out.
epc_req = 0xE7 # Econet Property Counter name.
pdc_req = 0x00 # Property Data Byte count.
req_cmd = bytes([ehd1,ehd2,tidh,tidl,Seoj_x1,Seoj_x2,Seoj_x3,Deoj_x1,Deoj_x2,Deoj_x3,esv_req,opc_req,epc_req,pdc_req])
def receive_msg():
lpcont = 1
while lpcont:
rcvmsg = ser.readline()
if rcvmsg.startswith(b"ERXUDP") :
rcv_txt = rcvmsg.strip().split(b' ')
rcv_edata = rcv_txt[9] # UDP EDATA
print("EDATA;", rcv_edata, "\r\n" )
rcv_tid = rcv_edata[4:4+4];
print("Receiveed TID:", rcv_tid )
if rcv_tid == b'5253' : # check my TID ?
lpcont = 0
else:
lpcpnt =1
else:
lpcont = 0
return rcvmsg
# Wait SKCOMMAND Echoback from WSUHA
def receive_echoback( skcmd ):
lpcont = 1
while lpcont:
rcv_echobk = ser.readline()
if rcv_echobk == b"": # Time Out, WSUHA did not send back SKCOMMAND
lpcont = 2
return lpcont
if rcv_echobk.startswith( skcmd ) :
print("Echoback : ", rcv_echobk )
lpcont = 0
else:
lpcont = 1
return lpcont
# Wait 'OK' from WSUHA
def receive_OK():
lpcont = 1
while lpcont:
rcvmsg = ser.readline()
if rcvmsg == b'':
lpcont = 2 # Time out Error
return lpcont
if rcvmsg.startswith(b"OK") :
print("Ack : ", rcvmsg, "\r\n" )
lpcont = 0
else:
lpcont = 1
return rcvmsg
def value_update():
text.set("更新中")
# time.sleep(5)
# Display New value
global label
global ipv6Addr
global req_cmd
global prv_pwrstr
powstr_1= get_edata( ipv6Addr, req_cmd )
prv_pwrstr = powstr_1
now = time.strftime("%H:%M:%S")
label.config( text=powstr_1 + " at " + now, font=("Times", '30'))
# label.pack()
#time.sleep( 5 )
text.set("更新")
return
def get_edata( ipv6Addr, req_cmd ):
# Send Data Request Command
print("Send Request Command ")
ser.write(b"SKSENDTO 1 " + ipv6Addr + b" 0E1A 1 0 000E ")
ser.write( req_cmd )
skcmd = b"SKSENDTO"
rtn_cd = receive_echoback( skcmd ) # get ECHOBACK
rtn_cd = receive_OK() # get 'OK'
# Get UDP packet includs EDATA
rcvmsg = receive_msg() # ERXUDP
print( rcvmsg,"\r\n")
intpower_str = "----W" # error code
if rcvmsg.startswith(b"ERXUDP") :
print( "ERXUDP received ", "\r\n")
cols = rcvmsg.strip().split(b' ')
res = cols[9] # UDP EDATA
print("EDATA;", res )
#tid = res[4:4+4];
seoj = res[8:8+6]
print("SEOJ:", seoj )
#deoj = res[14,14+6]
ESV = res[20:20+2]
print("ESV:", ESV )
#OPC = res[22,22+2]
if seoj == b'028801' and ESV == b'72':
# Check ESV(Responce:72) and Source EOJ, is SEOJ SmartMeter(028801) ?
EPC = res[24:24+2]
print("EPC:", EPC, "\r\n" )
# 瞬時電力計測値(E7)
if EPC == b'E7':
hexPower = res[28:28+8]
print( "Value:", hexPower, "\r\n")
intPower = int( hexPower, 16 )
intpower_str=str(intPower)+"W"
# return intpower_str
return intpower_str
def tick():
global label
global ipv6Addr
global req_cmd
global prv_pwrstr
now = time.strftime("%H:%M:%S")
now_sec = now[6:6+2]
if now_sec == "00":
powstr_1= get_edata( ipv6Addr, req_cmd )
prv_pwrstr = powstr_1
else:
powstr_1 = prv_pwrstr
now = time.strftime("%H:%M:%S")
label.config(
text=powstr_1 + " at " + now , font=("Times",'30'))
label.after(1000, tick)
return now
# Set up Main Window
root=tk.Tk()
w = root.winfo_screenwidth()
h = root.winfo_screenheight()
w = w-450
h = h-250
root.title("瞬間消費電力量")
root.geometry("400x150+"+str(w)+"+"+str(h))
# Set up WiSun and connection to B-Route
COM="COM3" # RS-WSUHA-P on Windows11 / Check COM port number by Device Manager
bitRate=115200
CRLF = bytes([ 13,10 ])
rbpwd = "02xxxxUAxxxx" # Authentication pass word 送配電会社から通知されたPWに変更して下さい
rbid = "0000009A05xxxx88400050xxxxxxD4C0" # Authentication ID 送配電会社から通知された認識用IDに変更して下さい
ser = serial.Serial(COM, bitRate, timeout=10 ) # init Serial Port
print("\r\nWSUHA & SmartMeter communication test 6R0\r\n ")
# Reset WSUHA command buffer
ser.write(b'SKRESET\r\n')
skcmd = b"SKRESET"
rtn_cd = receive_echoback( skcmd ) # get ECHOBACK
rtn_cd = receive_OK() # get 'OK'
# WSUHA F.W version Check
#ser.write(b'SKVER\r\n')
# ecbk = ser.readline() # echo back
#print( "Echo back:", ser.readline(), "\r\n")
#print("BP3501 Version:", ser.readline(), "\r\n")
# Set B-route Authentication Password
ser.write(b'SKSETPWD C ')
ser.write( rbpwd.encode('utf-8'))
ser.write( CRLF )
skcmd = b"SKSETPWD"
rtn_cd = receive_echoback( skcmd ) # get ECHOBACK
rtn_cd = receive_OK() # get 'OK'
# Set B-route Authentication ID
ser.write(b'SKSETRBID ')
ser.write(rbid.encode('utf-8') )
ser.write( CRLF )
skcmd = b"SKSETRBID"
rtn_cd = receive_echoback( skcmd ) # get ECHOBACK
rtn_cd = receive_OK() # get 'OK'
# B-Route scan to find SmartMeter
scanduration = 6
scan_data = {} # area for scaned results
print("\r\nScan start to detect SmartMeter\r\n")
ser.write(b"SKSCAN 2 FFFFFFFF 6 0 \r\n")
skcmd = b"SKSCAN"
rtn_cd = receive_echoback( skcmd ) # get ECHOBACK
rtn_cd = receive_OK() # get 'OK'
# Retry SCAN to get some responce from SmartMeter
scan_loop = True
while scan_loop :
rcvmsg = receive_msg()
print("Scan Result:", rcvmsg, "\r\n", end="")
if rcvmsg.startswith(b"EVENT 22") :
# SCAN ends : receive EVENT 22
scan_loop = False
elif rcvmsg.startswith(b" ") :
# Receive Data after doubel space
# " "
# Channel:39
# Channel Page:09
# Pan ID:FFFF
# Addr:FFFFFFFFFFFFFFFF
# LQI:A7
# PairID:FFFFFFFF
cols = rcvmsg.strip().split(b':')
scan_data[cols[0]] = cols[1]
# pull out Channel and set it to S2 reg.
ser.write(b"SKSREG S2 " + scan_data[b"Channel"] + b"\r\n")
skcmd = b"SKSREG"
rtn_cd = receive_echoback( skcmd ) # get ECHOBACK
rtn_cd = receive_OK() # get 'OK'
# pull out Pan ID and set it to S3 reg.
ser.write(b"SKSREG S3 " + scan_data[b"Pan ID"] + b"\r\n")
skcmd = b"SKSREG"
rtn_cd = receive_echoback( skcmd ) # get ECHOBACK
rtn_cd = receive_OK() # get 'OK'
# Convert MAC Address(64bit) to IPV6 address
ser.write(b"SKLL64 " + scan_data[b"Addr"] + b"\r\n")
skcmd = b"SKLL64"
rtn_cd = receive_echoback( skcmd ) # get ECHOBACK
ipv6Addr = ser.readline().strip()
print("IPv6 Address:", ipv6Addr, "\r\n")
# start to set up PANA Connection sequence
ser.write(b"SKJOIN " + ipv6Addr + b"\r\n");
skcmd = b"SKJOIN"
rtn_cd = receive_echoback( skcmd ) # get ECHOBACK
rtn_cd = receive_OK() # get 'OK'
# Wait PANA Connection completes until receive EVENT25.
bConnected = False
while not bConnected :
rcvmsg = receive_msg()
print(rcvmsg, "\r\n")
if rcvmsg.startswith(b"EVENT 24") :
print("PANA Connection failed.", "\r\n")
sys.exit() # Quit to Windows11
elif rcvmsg.startswith(b"EVENT 25") :
print("PANA Connection completed.", "\r\n" )
bConnected = True
# Set Serial communication timeout
ser.timeout = 3
# set up main frame and put it
frame = ttk.Frame(root)
#label_frame = tk.Label(frame)
#button_frame = tk.Button(frame)
# 上から順番に配置 Label -> Button
frame.pack(fill = tk.BOTH, padx=20,pady=10)
#Set up text to store StringVar
text = tk.StringVar(frame)
text.set("更新")
# button = tk.Button(frame, textvariable=text, width=20, height=10, font=("Times", "15", "bold"), command=value_update)
button = tk.Button(frame, textvariable=text, font=("Times", "15", "bold"), command=value_update)
powstr = get_edata( ipv6Addr, req_cmd )
prv_pwrstr = powstr
now = time.strftime("%H:%M:%S")
label = Label(frame, text=powstr +" at " + now , font=("Times", '30'))
print("Loop Start","\r\n")
# clock = tk.Label( root, font=("times", 50 ,"bold" ))
# clock.pack()
now = tick()
label.pack()
button.pack()
# main loop
root.mainloop()
ser.close()
print("WSUHWSUHA B-route test END")
スクリプト(zip)ダウンロード ※ e2eStore会員ログインが必要です。