スマートメータからWi-SUN Bルートで電力量取得 – サンプルスクリプト[WSUHA_broute_demo_01.py]
サンプルスクリプト [WSUHA_broute_demo_01.py]
# # RS-WSUHA-P function demo program Ver. 6R0 # WSUHA_broute_demo=01.py # # Get Electric Power data from Snart Neter through WiSUN B route # # 02/June/2023 Copyright RATOC Systems, Inc. Osaka Japan # # Important Notice : Use WOPT command to receive EUDX DATA in HEX format befor execute this program. # import serial # import time # from tkinter import Tk, Label from tkinter import Label # Tk import tkinter.ttk as ttk import tkinter as tk import time # Econet Lite DATA frame ehd1 = 0x10 # Econet Lite ehd2 = 0x81 # EDATA format 1 tidh = 0x52 # Transaction ID H "R" tidl = 0x53 # Transaction ID L "S" Seoj_x1 = 0x05 # Source EOJ Class Group Code 管理操作機器グループ Seoj_x2 = 0xFF # Source EOJ Class Code コントローラクラス Seoj_x3 = 0x01 # Source EOJ Instance Code Deoj_x1 = 0x02 # Destination EOJ Class Group Code 住宅関連機器グループ Deoj_x2 = 0x88 # Destination EOJ Class Code 低圧スマート電力量メータクラス Deoj_x3 = 0x01 # Destination EOJ Instance Code esv_req = 0x62 # Econet Lite Service Code Request to read out property value and send back with esv(0x72) opc_req = 0x01 # Number of property counter be red out. epc_req = 0xE7 # Econet Property Counter name. pdc_req = 0x00 # Property Data Byte count. req_cmd = bytes([ehd1,ehd2,tidh,tidl,Seoj_x1,Seoj_x2,Seoj_x3,Deoj_x1,Deoj_x2,Deoj_x3,esv_req,opc_req,epc_req,pdc_req]) def receive_msg(): lpcont = 1 while lpcont: rcvmsg = ser.readline() if rcvmsg.startswith(b"ERXUDP") : rcv_txt = rcvmsg.strip().split(b' ') rcv_edata = rcv_txt[9] # UDP EDATA print("EDATA;", rcv_edata, "\r\n" ) rcv_tid = rcv_edata[4:4+4]; print("Receiveed TID:", rcv_tid ) if rcv_tid == b'5253' : # check my TID ? lpcont = 0 else: lpcpnt =1 else: lpcont = 0 return rcvmsg # Wait SKCOMMAND Echoback from WSUHA def receive_echoback( skcmd ): lpcont = 1 while lpcont: rcv_echobk = ser.readline() if rcv_echobk == b"": # Time Out, WSUHA did not send back SKCOMMAND lpcont = 2 return lpcont if rcv_echobk.startswith( skcmd ) : print("Echoback : ", rcv_echobk ) lpcont = 0 else: lpcont = 1 return lpcont # Wait 'OK' from WSUHA def receive_OK(): lpcont = 1 while lpcont: rcvmsg = ser.readline() if rcvmsg == b'': lpcont = 2 # Time out Error return lpcont if rcvmsg.startswith(b"OK") : print("Ack : ", rcvmsg, "\r\n" ) lpcont = 0 else: lpcont = 1 return rcvmsg def value_update(): text.set("更新中") # time.sleep(5) # Display New value global label global ipv6Addr global req_cmd global prv_pwrstr powstr_1= get_edata( ipv6Addr, req_cmd ) prv_pwrstr = powstr_1 now = time.strftime("%H:%M:%S") label.config( text=powstr_1 + " at " + now, font=("Times", '30')) # label.pack() #time.sleep( 5 ) text.set("更新") return def get_edata( ipv6Addr, req_cmd ): # Send Data Request Command print("Send Request Command ") ser.write(b"SKSENDTO 1 " + ipv6Addr + b" 0E1A 1 0 000E ") ser.write( req_cmd ) skcmd = b"SKSENDTO" rtn_cd = receive_echoback( skcmd ) # get ECHOBACK rtn_cd = receive_OK() # get 'OK' # Get UDP packet includs EDATA rcvmsg = receive_msg() # ERXUDP print( rcvmsg,"\r\n") intpower_str = "----W" # error code if rcvmsg.startswith(b"ERXUDP") : print( "ERXUDP received ", "\r\n") cols = rcvmsg.strip().split(b' ') res = cols[9] # UDP EDATA print("EDATA;", res ) #tid = res[4:4+4]; seoj = res[8:8+6] print("SEOJ:", seoj ) #deoj = res[14,14+6] ESV = res[20:20+2] print("ESV:", ESV ) #OPC = res[22,22+2] if seoj == b'028801' and ESV == b'72': # Check ESV(Responce:72) and Source EOJ, is SEOJ SmartMeter(028801) ? EPC = res[24:24+2] print("EPC:", EPC, "\r\n" ) # 瞬時電力計測値(E7) if EPC == b'E7': hexPower = res[28:28+8] print( "Value:", hexPower, "\r\n") intPower = int( hexPower, 16 ) intpower_str=str(intPower)+"W" # return intpower_str return intpower_str def tick(): global label global ipv6Addr global req_cmd global prv_pwrstr now = time.strftime("%H:%M:%S") now_sec = now[6:6+2] if now_sec == "00": powstr_1= get_edata( ipv6Addr, req_cmd ) prv_pwrstr = powstr_1 else: powstr_1 = prv_pwrstr now = time.strftime("%H:%M:%S") label.config( text=powstr_1 + " at " + now , font=("Times",'30')) label.after(1000, tick) return now # Set up Main Window root=tk.Tk() w = root.winfo_screenwidth() h = root.winfo_screenheight() w = w-450 h = h-250 root.title("瞬間消費電力量") root.geometry("400x150+"+str(w)+"+"+str(h)) # Set up WiSun and connection to B-Route COM="COM3" # RS-WSUHA-P on Windows11 / Check COM port number by Device Manager bitRate=115200 CRLF = bytes([ 13,10 ]) rbpwd = "02xxxxUAxxxx" # Authentication pass word 送配電会社から通知されたPWに変更して下さい rbid = "0000009A05xxxx88400050xxxxxxD4C0" # Authentication ID 送配電会社から通知された認識用IDに変更して下さい ser = serial.Serial(COM, bitRate, timeout=10 ) # init Serial Port print("\r\nWSUHA & SmartMeter communication test 6R0\r\n ") # Reset WSUHA command buffer ser.write(b'SKRESET\r\n') skcmd = b"SKRESET" rtn_cd = receive_echoback( skcmd ) # get ECHOBACK rtn_cd = receive_OK() # get 'OK' # WSUHA F.W version Check #ser.write(b'SKVER\r\n') # ecbk = ser.readline() # echo back #print( "Echo back:", ser.readline(), "\r\n") #print("BP3501 Version:", ser.readline(), "\r\n") # Set B-route Authentication Password ser.write(b'SKSETPWD C ') ser.write( rbpwd.encode('utf-8')) ser.write( CRLF ) skcmd = b"SKSETPWD" rtn_cd = receive_echoback( skcmd ) # get ECHOBACK rtn_cd = receive_OK() # get 'OK' # Set B-route Authentication ID ser.write(b'SKSETRBID ') ser.write(rbid.encode('utf-8') ) ser.write( CRLF ) skcmd = b"SKSETRBID" rtn_cd = receive_echoback( skcmd ) # get ECHOBACK rtn_cd = receive_OK() # get 'OK' # B-Route scan to find SmartMeter scanduration = 6 scan_data = {} # area for scaned results print("\r\nScan start to detect SmartMeter\r\n") ser.write(b"SKSCAN 2 FFFFFFFF 6 0 \r\n") skcmd = b"SKSCAN" rtn_cd = receive_echoback( skcmd ) # get ECHOBACK rtn_cd = receive_OK() # get 'OK' # Retry SCAN to get some responce from SmartMeter scan_loop = True while scan_loop : rcvmsg = receive_msg() print("Scan Result:", rcvmsg, "\r\n", end="") if rcvmsg.startswith(b"EVENT 22") : # SCAN ends : receive EVENT 22 scan_loop = False elif rcvmsg.startswith(b" ") : # Receive Data after doubel space # " " # Channel:39 # Channel Page:09 # Pan ID:FFFF # Addr:FFFFFFFFFFFFFFFF # LQI:A7 # PairID:FFFFFFFF cols = rcvmsg.strip().split(b':') scan_data[cols[0]] = cols[1] # pull out Channel and set it to S2 reg. ser.write(b"SKSREG S2 " + scan_data[b"Channel"] + b"\r\n") skcmd = b"SKSREG" rtn_cd = receive_echoback( skcmd ) # get ECHOBACK rtn_cd = receive_OK() # get 'OK' # pull out Pan ID and set it to S3 reg. ser.write(b"SKSREG S3 " + scan_data[b"Pan ID"] + b"\r\n") skcmd = b"SKSREG" rtn_cd = receive_echoback( skcmd ) # get ECHOBACK rtn_cd = receive_OK() # get 'OK' # Convert MAC Address(64bit) to IPV6 address ser.write(b"SKLL64 " + scan_data[b"Addr"] + b"\r\n") skcmd = b"SKLL64" rtn_cd = receive_echoback( skcmd ) # get ECHOBACK ipv6Addr = ser.readline().strip() print("IPv6 Address:", ipv6Addr, "\r\n") # start to set up PANA Connection sequence ser.write(b"SKJOIN " + ipv6Addr + b"\r\n"); skcmd = b"SKJOIN" rtn_cd = receive_echoback( skcmd ) # get ECHOBACK rtn_cd = receive_OK() # get 'OK' # Wait PANA Connection completes until receive EVENT25. bConnected = False while not bConnected : rcvmsg = receive_msg() print(rcvmsg, "\r\n") if rcvmsg.startswith(b"EVENT 24") : print("PANA Connection failed.", "\r\n") sys.exit() # Quit to Windows11 elif rcvmsg.startswith(b"EVENT 25") : print("PANA Connection completed.", "\r\n" ) bConnected = True # Set Serial communication timeout ser.timeout = 3 # set up main frame and put it frame = ttk.Frame(root) #label_frame = tk.Label(frame) #button_frame = tk.Button(frame) # 上から順番に配置 Label -> Button frame.pack(fill = tk.BOTH, padx=20,pady=10) #Set up text to store StringVar text = tk.StringVar(frame) text.set("更新") # button = tk.Button(frame, textvariable=text, width=20, height=10, font=("Times", "15", "bold"), command=value_update) button = tk.Button(frame, textvariable=text, font=("Times", "15", "bold"), command=value_update) powstr = get_edata( ipv6Addr, req_cmd ) prv_pwrstr = powstr now = time.strftime("%H:%M:%S") label = Label(frame, text=powstr +" at " + now , font=("Times", '30')) print("Loop Start","\r\n") # clock = tk.Label( root, font=("times", 50 ,"bold" )) # clock.pack() now = tick() label.pack() button.pack() # main loop root.mainloop() ser.close() print("WSUHWSUHA B-route test END")
スクリプト(zip)ダウンロード ※ e2eStore会員ログインが必要です。