Controllare Waveshare Modbus POE ETH Relay con Python

Controllare Waveshare Modbus POE ETH Relay con Python

1920 1280 Nicola Montemurro

Periodo, per me, molto intenso; ci sono all’orizzonte un po’ di novità, tutte in sospeso; notizie positive, ma che come spesso mi è capitato, non sono mai in “fila”, ma “sovrapposte” e dato che dipendono l’una dall’altra, perché si incastrino e producano il risultato tanto agognato, occorre attendere che tutte siano a buon fine e, quindi, per “distrarre” la mente, scrivo codice.

La programmazione, per me, è sempre stato lo strumento di “pace”, rilassante, ma nel contempo stimolante (potrà sembrare un controsenso, ma vi assicuro che non lo è) e comunque, in ogni caso, didattico .

Era un po’ di tempo che avevo questa scheda Relay a 8 canali, che avevo acquistato per fare qualche esperimento in ottica automazione Modbus, come descritto in Controllare Waveshare Modbus POE ETH Relay in Home Assistant  e sull’onda degli articoli “Multipresa smart con Raspberry PI” e “Multipresa smart MQTT integrata in Home Assistant” ho provato estendere, ulteriormente, il livello di integrazione.

Su Github si trova uno script per la gestione “ibrida” della stessa (vedi su github), cioè che, utilizza il protocollo di rete TCP per trasportare i comandi nativi RTU (seriali), mentre è possibile utilizzare direttamente il Modbus TCP, lasciando fare alla scheda stessa la “traduzione” (link alla pagina ufficiale).

Lo script mostrato di seguito è stato derivato da quello preso da github, esteso e modificato, eliminando la gestione del CRC, inutile se si utilizza il protocollo TCP.

# ----------------------------------------------------------------------------------
# - Script file name    :       waveshare_tcp.py
# - Author              :       Nicola Montemurro
# - DNS administrator   :       Nicola Montemurro  - Tel. XXX, Mobile: XXX -
# - Create              :       12.01.2024
# - Last Update         :       12.01.2024
# - Description         :
# - Position            :       /usr/local/script
# - note                :       NON modificare senza AUTORIZZAZIONE dell'AMMINISTRATORE
#  -----------------------------------------------------------------------------------

### INFO => https://www.waveshare.com/wiki/Modbus_POE_ETH_Relay

import argparse
import socket
import time

ACTION_ON = 0xFF
ACTION_OFF = 0
ACTION_FLIP = 0x55

ACTION_FLASH_ON = 0x02
ACTION_FLASH_OFF = 0x04

ACTION_ALL_ON = 255
ACTION_ALL_OFF = 0

cmd = [0, 0, 0, 0, 0 ,0 , 0, 0, 0, 0, 0, 0]

cmd[5] = 0x06  #Byte length
cmd[6] = 0x01  #Device address
cmd[7] = 0x05  # command Control relay commands

def send_cmd(s: socket.socket, cmd: bytearray):
#    print(cmd) ## DEBUG
    s.send(bytearray(cmd))
    time.sleep(0.2)


def relay_cmd(n, command):
    cmd[8] = 0
    cmd[9] = n
    cmd[10] = command
    return cmd

def relay_flash_cmd(n, command, duration):
    cmd[8] = command
    cmd[9] = n
    cmd[11] = duration
    return cmd

def main():
    parser = argparse.ArgumentParser()
    parser.add_argument("--host", default="192.168.1.100", help="Relay host IP address.")
    parser.add_argument("--port", type=int, default=502, help="Port number.")
    parser.add_argument("--loglevel", default="INFO", help="Log level.")

    subparsers = parser.add_subparsers(dest="command")

    relay_parser = subparsers.add_parser("relay")
    relay_parser.add_argument("number", type=int, choices=list(range(1, 9)) + [256], help="Relay number 1-8, 256 for all relays.")
    relay_parser.add_argument("action", choices=["on", "off", "flip"], help="Relay action.")

    flash_parser = subparsers.add_parser("flash")
    flash_parser.add_argument("number", type=int, choices=list(range(1, 9)), help="Relay number 1-8.")
    flash_parser.add_argument("action", choices=["on", "off"], help="Flash action.")

    def duration_type(value):
        ivalue = int(value)
        if ivalue < 1 or ivalue > 65535:
            raise argparse.ArgumentTypeError("Duration must be between 1 and 65535")
        return ivalue

    flash_parser.add_argument("duration", type=duration_type,
                              help="Duration of the flash action in 100ms increments. `10` equals 1 second. (1-65535)")

    args = parser.parse_args()
    if not args.command:
        parser.print_help()
        return

    with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s:
        s.connect((args.host, args.port))

        if args.command == "relay":
            coms = {
                'on': ACTION_ON,
                'off': ACTION_OFF,
                'flip': ACTION_FLIP
            }
            send_cmd(s, relay_cmd(args.number - 1, coms.get(args.action)))

        elif args.command == "flash":
            coms = {
                'on': ACTION_FLASH_ON,
                'off': ACTION_FLASH_OFF
            }
            send_cmd(s, relay_flash_cmd(args.number - 1, coms.get(args.action), args.duration))

if __name__ == '__main__':
    main()

Esempi d'uso

La prima parte del comando riguarda l’identificazione dell’indirizzo del device Waveshare e della porta alla quale inviare i comandi (i valori di default host sono impostati a 192.168.1.100 e la porta a 502, per cui, qualora si volesse controllare un solo device, si possono impostare i valori di riferimento direttamente nello script e omettendoli nella riga di comando, altrimenti, qualora si volessero controllare più devices dovranno essere specificati, per indicare il device target, di volta in volta.

python waveshare_tcp.py --host 172.16.2.2 --port 502

A questo devono seguire i comandi,  da eseguire sulla scheda, riguardanti i relay, che sono:

  • relay n on|off|flip – in cui n indica il numero di relay (da 1 a 8), o 256 per tutti seguito dallo stato on, off, o flip (inversione di stato)
  • flash n on|off t – in cui n indica il numero di relay (da 1 a 8), seguito dallo stato on, off, (lo stato sul quale applicare il tempo di flash) e t un numero compreso tra 1 e 65535 indicante il tempo di flash ( 1 = 100ms)

Accensione, Spegnimento e Flip (inversione di stato)

python waveshare_tcp.py --host 172.16.2.2 --port 502 relay 5 on

Accende il relay 5

python waveshare_tcp.py --host 172.16.2.2 --port 502 relay 5 off

Spegne il relay 5

python waveshare_tcp.py --host 172.16.2.2 --port 502 relay 7 flip

Inverte lo stato del relay 7 (se acceso lo spegne altrimenti lo accende)

Flashing di uno stato

python waveshare_tcp.py --host 172.16.2.2 --port 502 flash 5 off 50

Spegne il relay 5 per 5 secondi e poi lo accende

python waveshare_tcp.py --host 172.16.2.2 --port 502 flash 7 on 100

Accende il relay 7 per 10 secondi e poi lo spegne

Accensione, spegnimento e flip di tutti i Relay

python waveshare_tcp.py --host 172.16.2.2 --port 502 relay 256 on

Accende tutti i relay

python waveshare_tcp.py --host 172.16.2.2 --port 502 relay 256 off

Spegne tutti i relay

python waveshare_tcp.py --host 172.16.2.2 --port 502 relay 256 flip

Inverte lo stato di tutti i relay

Stress test

for /l %a in (1, 1, 100) do (for /l %b in (1,1,8) do python waveshare_tcp.py --host 172.16.2.2 relay %b flip)

Un test, da eseguire su Windows, per verificare che tutto funzioni correttamente; eseguirà per 100 volte il flip di tutti i  relay in sequenza.

Per qualsiasi suggerimento oppure qualora abbiate qualche difficoltà, sono sempre disponibile.

    Preferenze Privacy

    Quando visiti il nostro sito web, possono essere memorizzate alcune informazioni, di servizi specifici, tramite il tuo browser, di solito sotto forma di cookie. Qui puoi modificare le tue preferenze sulla privacy. Il blocco di alcuni cookie può influire sulla tua esperienza sul nostro sito Web e sui servizi che offriamo.

    Click to enable/disable Google Analytics tracking code.
    Click to enable/disable Google Fonts.
    Click to enable/disable Google Maps.
    Click to enable/disable video embeds.
    Il nostro sito web utilizza cookie, principalmente di terze parti. Personalizza le preferenze sulla privacy e/o acconsenti all'utilizzo dei cookie.