スマートメータからWi-SUN Bルートで電力量取得 – サンプルスクリプト[WSUHA_broute_demo_01.py]

 
サンプルスクリプト [WSUHA_broute_demo_01.py]



#
#   RS-WSUHA-P function demo program Ver. 6R0
#      WSUHA_broute_demo=01.py
#
#   Get Electric Power data from Snart Neter through WiSUN B route
#
#           02/June/2023  Copyright RATOC Systems, Inc. Osaka Japan
#
#    Important Notice : Use WOPT command to receive EUDX DATA in HEX format befor execute this program. 
#

import serial
# import time
# from tkinter import Tk, Label

from tkinter import Label  # Tk
import tkinter.ttk as ttk
import tkinter as tk
import time

# Econet Lite DATA frame
ehd1 = 0x10       # Econet Lite
ehd2 = 0x81       # EDATA format 1
tidh = 0x52       # Transaction ID H "R"
tidl = 0x53       # Transaction ID L "S"
Seoj_x1 = 0x05    # Source EOJ Class Group Code   管理操作機器グループ
Seoj_x2 = 0xFF    # Source EOJ Class Code      コントローラクラス
Seoj_x3 = 0x01    # Source EOJ Instance Code
Deoj_x1 = 0x02    # Destination EOJ Class Group Code 住宅関連機器グループ
Deoj_x2 = 0x88    # Destination EOJ Class Code    低圧スマート電力量メータクラス
Deoj_x3 = 0x01    # Destination EOJ Instance Code
esv_req = 0x62    # Econet Lite Service Code   Request to read out property value and send back with esv(0x72)
opc_req = 0x01    # Number of property counter be red out.
epc_req = 0xE7    # Econet Property Counter name.
pdc_req = 0x00    # Property Data Byte count.
    
req_cmd = bytes([ehd1,ehd2,tidh,tidl,Seoj_x1,Seoj_x2,Seoj_x3,Deoj_x1,Deoj_x2,Deoj_x3,esv_req,opc_req,epc_req,pdc_req])

def receive_msg():
    lpcont = 1
    while lpcont:
        rcvmsg = ser.readline()
        if rcvmsg.startswith(b"ERXUDP") :
            rcv_txt = rcvmsg.strip().split(b' ')
            rcv_edata = rcv_txt[9]               # UDP  EDATA 
            print("EDATA;", rcv_edata, "\r\n"  )
       	    rcv_tid = rcv_edata[4:4+4];
            print("Receiveed TID:", rcv_tid )
            if rcv_tid == b'5253' :              # check my TID ?
                lpcont = 0
            else:
                lpcpnt =1
        else:
            lpcont = 0
    return rcvmsg    

#   Wait SKCOMMAND Echoback from WSUHA
def receive_echoback( skcmd ):
    lpcont = 1
    while lpcont:
        rcv_echobk = ser.readline()
        if rcv_echobk == b"":               # Time Out, WSUHA did not send back SKCOMMAND
           lpcont = 2
           return lpcont
        if rcv_echobk.startswith( skcmd ) :
            print("Echoback : ", rcv_echobk )
            lpcont = 0
        else:
            lpcont = 1
    return lpcont

#  Wait 'OK' from WSUHA
def receive_OK():
    lpcont = 1
    while lpcont:
        rcvmsg = ser.readline()
        if rcvmsg == b'':
            lpcont = 2                          # Time out Error
            return lpcont 
        if rcvmsg.startswith(b"OK") :
            print("Ack : ", rcvmsg, "\r\n"  )
            lpcont = 0
        else:
            lpcont = 1
    return rcvmsg    


def value_update():
    text.set("更新中")
    # time.sleep(5)
    # Display New value
    global label
    global ipv6Addr
    global req_cmd
    global prv_pwrstr

    powstr_1= get_edata( ipv6Addr, req_cmd )
    prv_pwrstr = powstr_1

    now = time.strftime("%H:%M:%S")
    label.config( text=powstr_1 + " at " + now, font=("Times", '30'))
    # label.pack()
    #time.sleep( 5 )
    text.set("更新")
    return

def get_edata( ipv6Addr, req_cmd ):

    # Send Data Request Command
    print("Send Request Command ")
    ser.write(b"SKSENDTO 1 " + ipv6Addr + b" 0E1A 1 0 000E ")
    ser.write( req_cmd )

    skcmd = b"SKSENDTO"
    rtn_cd = receive_echoback( skcmd )        # get ECHOBACK
    rtn_cd = receive_OK()                     # get 'OK'

    #  Get UDP packet includs EDATA
    rcvmsg = receive_msg()         # ERXUDP
    print( rcvmsg,"\r\n")
    intpower_str = "----W"         # error code

    if rcvmsg.startswith(b"ERXUDP") :
        print(  "ERXUDP received ", "\r\n")
        cols = rcvmsg.strip().split(b' ')
        res = cols[9]   # UDP  EDATA 
        print("EDATA;", res )
       	#tid = res[4:4+4];
        seoj = res[8:8+6]
        print("SEOJ:", seoj )
       	#deoj = res[14,14+6]
        ESV = res[20:20+2] 
        print("ESV:", ESV )
       	#OPC = res[22,22+2]
        if seoj == b'028801' and ESV == b'72':
        # Check ESV(Responce:72) and Source EOJ, is SEOJ SmartMeter(028801) ?
            EPC = res[24:24+2]
            print("EPC:", EPC, "\r\n" )
            # 瞬時電力計測値(E7)
            if EPC == b'E7':
                hexPower = res[28:28+8] 
                print( "Value:", hexPower, "\r\n") 
                intPower = int( hexPower, 16 )
                intpower_str=str(intPower)+"W"
                # return intpower_str
    return intpower_str

def tick():
    global label
    global ipv6Addr
    global req_cmd
    global prv_pwrstr

    now = time.strftime("%H:%M:%S")
    now_sec = now[6:6+2]

    if now_sec == "00":
        powstr_1= get_edata( ipv6Addr, req_cmd )
        prv_pwrstr = powstr_1
    else:
        powstr_1 = prv_pwrstr

    now = time.strftime("%H:%M:%S")
    label.config(
        text=powstr_1 + " at " + now , font=("Times",'30'))
    label.after(1000, tick)
    return now

# Set up Main Window
root=tk.Tk()

w = root.winfo_screenwidth()
h = root.winfo_screenheight()

w = w-450
h = h-250

root.title("瞬間消費電力量")
root.geometry("400x150+"+str(w)+"+"+str(h))

#  Set up WiSun and connection to B-Route

COM="COM3"       # RS-WSUHA-P on Windows11 / Check COM port number by Device Manager
bitRate=115200
CRLF = bytes([ 13,10 ])

rbpwd =  "02xxxxUAxxxx"                        # Authentication pass word 送配電会社から通知されたPWに変更して下さい
rbid =  "0000009A05xxxx88400050xxxxxxD4C0"     # Authentication ID 送配電会社から通知された認識用IDに変更して下さい

ser = serial.Serial(COM, bitRate, timeout=10 )     # init Serial Port

print("\r\nWSUHA & SmartMeter communication test 6R0\r\n ")

# Reset WSUHA command buffer
ser.write(b'SKRESET\r\n')
skcmd = b"SKRESET"
rtn_cd = receive_echoback( skcmd )        # get ECHOBACK
rtn_cd = receive_OK()                     # get 'OK'

#  WSUHA F.W version Check
#ser.write(b'SKVER\r\n')
# ecbk = ser.readline()  # echo back
#print( "Echo back:", ser.readline(), "\r\n")
#print("BP3501 Version:", ser.readline(), "\r\n")

# Set B-route Authentication Password 
ser.write(b'SKSETPWD C ')
ser.write( rbpwd.encode('utf-8'))
ser.write( CRLF )
skcmd = b"SKSETPWD"
rtn_cd = receive_echoback( skcmd )        # get ECHOBACK
rtn_cd = receive_OK()                     # get 'OK'


# Set B-route Authentication ID
ser.write(b'SKSETRBID ')
ser.write(rbid.encode('utf-8') )
ser.write( CRLF )
skcmd = b"SKSETRBID"
rtn_cd = receive_echoback( skcmd )        # get ECHOBACK
rtn_cd = receive_OK()                     # get 'OK'

#  B-Route scan to find SmartMeter
scanduration = 6       
scan_data = {}      # area for scaned results

print("\r\nScan start to detect SmartMeter\r\n")
ser.write(b"SKSCAN 2 FFFFFFFF 6 0 \r\n") 
skcmd = b"SKSCAN"
rtn_cd = receive_echoback( skcmd )        # get ECHOBACK
rtn_cd = receive_OK()                     # get 'OK'
# Retry SCAN to get some responce from SmartMeter
scan_loop = True
while scan_loop :
    rcvmsg = receive_msg()
    print("Scan Result:", rcvmsg, "\r\n", end="")

    if rcvmsg.startswith(b"EVENT 22") :
    # SCAN ends : receive EVENT 22
        scan_loop = False
    elif rcvmsg.startswith(b"  ") :
            # Receive Data after doubel space
            #  "  "
            #  Channel:39
            #  Channel Page:09
            #  Pan ID:FFFF
            #  Addr:FFFFFFFFFFFFFFFF
            #  LQI:A7
            #  PairID:FFFFFFFF
        cols = rcvmsg.strip().split(b':')
        scan_data[cols[0]] = cols[1]

# pull out Channel and set it to S2 reg.
ser.write(b"SKSREG S2 " + scan_data[b"Channel"] + b"\r\n")
skcmd = b"SKSREG"
rtn_cd = receive_echoback( skcmd )        # get ECHOBACK
rtn_cd = receive_OK()                     # get 'OK'

# pull out Pan ID and set it to S3 reg.
ser.write(b"SKSREG S3 " + scan_data[b"Pan ID"] + b"\r\n")
skcmd = b"SKSREG"
rtn_cd = receive_echoback( skcmd )        # get ECHOBACK
rtn_cd = receive_OK()                     # get 'OK'

# Convert MAC Address(64bit) to IPV6 address
ser.write(b"SKLL64 " + scan_data[b"Addr"] + b"\r\n")
skcmd = b"SKLL64"
rtn_cd = receive_echoback( skcmd )        # get ECHOBACK
ipv6Addr = ser.readline().strip()
print("IPv6 Address:", ipv6Addr, "\r\n")

# start to set up PANA Connection sequence
ser.write(b"SKJOIN " + ipv6Addr + b"\r\n");
skcmd = b"SKJOIN"
rtn_cd = receive_echoback( skcmd )        # get ECHOBACK
rtn_cd = receive_OK()                     # get 'OK'

# Wait PANA Connection completes until receive EVENT25.
bConnected = False
while not bConnected :
    rcvmsg = receive_msg()
    print(rcvmsg, "\r\n")
    if rcvmsg.startswith(b"EVENT 24") :
        print("PANA Connection failed.", "\r\n")
        sys.exit()  # Quit to Windows11 
    elif rcvmsg.startswith(b"EVENT 25") :
        print("PANA Connection completed.", "\r\n" )
        bConnected = True

# Set Serial communication timeout
ser.timeout = 3

# set up main frame and put it
frame = ttk.Frame(root)
#label_frame = tk.Label(frame)
#button_frame = tk.Button(frame)

# 上から順番に配置  Label -> Button

frame.pack(fill = tk.BOTH, padx=20,pady=10)

#Set up text to store  StringVar
text = tk.StringVar(frame)
text.set("更新")

# button = tk.Button(frame, textvariable=text, width=20, height=10, font=("Times", "15", "bold"), command=value_update)
button = tk.Button(frame, textvariable=text, font=("Times", "15", "bold"), command=value_update)

powstr = get_edata( ipv6Addr, req_cmd )
prv_pwrstr = powstr

now = time.strftime("%H:%M:%S")

label = Label(frame, text=powstr +" at " + now , font=("Times", '30'))

print("Loop Start","\r\n")


# clock = tk.Label( root, font=("times", 50 ,"bold" ))
# clock.pack()


now = tick()

label.pack()
button.pack()

# main loop
root.mainloop()

ser.close()

print("WSUHWSUHA B-route test END")


スクリプト(zip)ダウンロード  ※ e2eStore会員ログインが必要です。