public:radio:dormant:netradio_control
Table of Contents
Netradio Control
Python code for TCP/IP control of multiple radios
Server
This provides TCP/IP access to one or more radios, allowing control of Frequency, Mode, Pre-amp and Attenuator. It also gives remote access to S-meter readings.
There are separate rig-dependant command modules that then provide a standard interface for commands sent by a client, regardless of the particular radio connected.
There are modules for
- Icom C-IV radios such as IC-R75, IC-718, IC-7200 etc.
- AOR AR7030
- Icom IC-M710 marine
Server
- server.py
from aor import * from icom import * from conf import * from m710 import * import SocketServer import time try: import readline except: pass lock = threading.Lock() radios = [] r1 = Icom(n1, a1, cal1) radios.append(n1) r2 = m710(n4) radios.append(n4) #r2 = Icom(n2, a2, cal2) #radios.append(n2) r3 = Ar7030(n3) radios.append(n3) print radios #print r1.digi_off() #print r2.remote_on() def count_radios(): count = len(radios) return count def list_radios(): radiolist = "" for n in range(0, len(radios)): r = radios[n] radiolist += (r + " ") return radiolist def write_file(text): filename = 'commandlog.txt' f = open(filename, 'a+') # a+ is "append to file, create it if it doesn't exist" timenow = time.strftime("%d/%m/%Y %H:%M:%S", time.gmtime(time.time())) log = " ".join((timenow, text)) # make an entry for the log by joining the timestamp with the text passed in f.write(log) f.close() def write_con(text): filename = 'conlog.txt' f = open(filename, 'a+') # a+ is "append to file, create it if it doesn't exist" timenow = time.strftime("%d/%m/%Y %H:%M:%S", time.gmtime(time.time())) log = " ".join((timenow, text)) # make an entry for the log by joining the timestamp with the text passed in f.write(log) f.close() # The Server class ThreadedRequestHandler(SocketServer.StreamRequestHandler): def handle(self): # we find the current thread for the client connection just set up, to # use in the log file cur_thread = threading.currentThread() # log the new connection details write_con("Connect from %s using %s \n" % ((self.client_address[0]), cur_thread.getName())) # print to the server's console the new connection IP address/port print self.client_address # loop to handle client requests.... while True: # using StreamRequestHandler means our input from the client # is "file-like" and can be read with "file-like" commands # we read a line at a time, using readline() cmd = self.rfile.readline().lower() # to keep things clean, we remove any characters that aren't # "printable" simple ASCII # these are between 32 and 127 in the ASCII table # we look at each character, and then make a new word by # .join()ing each accepted character with no space in between asccmd = "".join(x for x in cmd if ord(x) < 128 and ord(x) > 31) # we make a list called "words" holding the received words which # will be inspected by various functions words = asccmd.split() # If a client uses sock.close() itself, to disconnect, it appears that # we read a continuous stream of "" on the dead # socket, which puts CPU to 100%. # # The "While" loop is probably responsible, but I can't see another # way to keep the connection up for multiple commands. # # Further connection are accepted due to the Threaded nature of the server. # The CPU load is unacceptable though # HACK ?>>>>> # Looking for "" and then breaking # the connection from the server end (even though the client has # gone) cures this. if cmd == "": break else: pass # if the words list is empty, go back and get more input if not words: continue # we have input.... # filter based on the first word - these are the # pre-set commands the server will accept # the client wants to know the currently available # radio names - held in the variable "rnames" elif words[0] == "getnames": self.wfile.write(rnames) # words[-1] (the last word in the list) will always be the # radio name. We give the variable "my_radio" this value, for # identifying which radio object to apply the method to elif words[0] == "count": count = count_radios() self.wfile.write(count) elif words[0] == "ident": ident_text = "GM4SLV Radio Server" radio_list = list_radios() self.wfile.write(ident_text + "\rAvailable : \n" + radio_list + "\r\n") elif words[0] == "getmode": my_radio = eval(words[-1]) mode = my_radio.get_mode() self.wfile.write(mode) elif words[0] == "getfreq": my_radio = eval(words[-1]) freq = words[1] freq = my_radio.get_freq() self.wfile.write(freq) elif words[0] == "setmode": my_radio = eval(words[-1]) mode = words[1] newmode = my_radio.set_mode(mode) self.wfile.write(newmode) elif words[0] == "setfreq": my_radio = eval(words[-1]) try: freq = float(words[1]) newfreq = my_radio.set_freq(freq) self.wfile.write(newfreq) except ValueError: #freq = float(my_radio.get_freq()) self.wfile.write("Error in freq. %s No change\r\n" % words[1]) elif words[0] == "getsmeter": my_radio = eval(words[-1]) smeter = round(float(my_radio.get_smeter()), 1) self.wfile.write(smeter) elif words[0] == "gets": my_radio = eval(words[-1]) s = my_radio.get_s() self.wfile.write(s) elif words[0] == "listradios": radios = list_radios() self.wfile.write(radios) elif words[0] == "getpreamp": my_radio = eval(words[-1]) preamp = my_radio.get_pre() self.wfile.write(preamp) elif words[0] == "preampon": my_radio = eval(words[-1]) preamp = my_radio.pre_on() self.wfile.write(preamp) elif words[0] == "preamp2on": my_radio = eval(words[-1]) preamp = my_radio.pre_2_on() self.wfile.write(preamp) elif words[0] == "preampoff": my_radio = eval(words[-1]) preamp = my_radio.pre_off() self.wfile.write(preamp) elif words[0] == "getatt": my_radio = eval(words[-1]) att = my_radio.get_att() self.wfile.write(att) elif words[0] == "atton": my_radio = eval(words[-1]) att = my_radio.att_on() self.wfile.write(att) elif words[0] == "attoff": my_radio = eval(words[-1]) att = my_radio.att_off() self.wfile.write(att) elif words[0] == "tune": my_radio = eval(words[-1]) tune = my_radio.tune() self.wfile.write(tune) elif words[0] == "getpwr": my_radio = eval(words[-1]) pwr = my_radio.get_pwr() self.wfile.write(pwr) elif words[0] == "setpwr": my_radio = eval(words[-1]) spwr = words[1] pwr = my_radio.set_pwr(spwr) self.wfile.write(pwr) elif words[0] == "quit": write_con("Got quit from {}\n".format(self.client_address[0])) # log it self.wfile.write("Goodbye! \r\n") # say Goodbye break else: # nothing in words[0] matches a pre-set command.... write_file("Received %s\n" % words) # log it, it's unusual self.wfile.write("Command not recognized\r\n") # inform the client class ThreadedIcomServer(SocketServer.ThreadingMixIn, SocketServer.TCPServer): pass if __name__ == '__main__': # define the lock to be used on the serial port access lock = threading.Lock() # address ('' = all available interfaces) to listen on, and port number address = ('', 9999) server = ThreadedIcomServer(address, ThreadedRequestHandler) server.allow_reuse_address = True # define that the server will be threaded, and will serve "forever" ie. not quit after the client disconnects t = threading.Thread(target=server.serve_forever) # start the server thread t.start() write_con( "Server loop running in thread: %s\n" % "".join(t.getName()))
Configurtion Module
- cpnf.py
# header preamble = "\xfe" controller = "\xe0" # commands/requests set_freq_cmd = "\x05" set_mode_cmd = "\x06" get_freq_cmd = "\x03" get_mode_cmd = "\x04" get_smeter_cmd = "\x15" + "\x02" get_swr_cmd = "\x15" + "\x12" digi_off_cmd = "\x1a" + "\x04" + "\x00" + "\x00" set_pre_cmd = "\x16" + "\x02" set_pre_off = "\x00" set_pre_on = "\x01" set_pre_2_on = "\x02" set_att_cmd = "\x11" set_att_on = "\x20" set_att_off = "\x00" ptt_on_cmd = "\x1c" + "\x00" + "\x01" ptt_off_cmd = "\x1c" + "\x00" + "\x00" pwr_cmd = "\x14" + "\x0a" # end of message eom = "\xfd" # controller responses ack = "\xfb" nak = "\xfa" a1 = "\x5A" n1 = "IC-R75" cal1 = ( 25, 1, 36, 47, 31, 18, 34, 35 ) a2 = "\x5e" # a2 = "\x01" n2 = "IC-718" cal2 = ( 27, 28, 58, 10, 14, 14, 35, 42 ) n3 = "AR7030" n4 = "IC-M710"
Icom C-IV module
- icom.py
import serial import threading from conf import * import time #sport = "COM1" sport = "/dev/ttyS0" sbaud = 9600 lock = threading.Lock() class Icom(object): def __init__(self, model, radio_address, cal): self.ser = serial.Serial(sport, sbaud, timeout=0.1) self.model = model self.radio_address = radio_address self.cal = cal def digi_off(self): sendStr = preamble + preamble + self.radio_address + controller + digi_off_cmd + eom result = self.tx_rx(sendStr) if result[4] == ack: return "Success" elif result[4] == nak: return "NAK received" def get_pre(self): sendStr = preamble + preamble + self.radio_address + controller + set_pre_cmd + eom result = self.tx_rx(sendStr) if not result: return "0" if result[6] == "\x00": return 0 elif result[6] == "\x01": return 1 elif result[6] == "\x02": return 2 def get_pwr(self): sendStr = preamble + preamble + self.radio_address + controller + pwr_cmd + eom result = self.tx_rx(sendStr) if not result: return "0" p1 = ord(result[7]) / 16 p2 = ord(result[7]) % 16 p3 = ord(result[6]) / 16 p4 = ord(result[6]) % 16 pwr = float(100 * (10 * p3 + p4) + (10 * p1 + p2)) return int(pwr*100/255) def set_pwr(self, pwr): #if pwr == "25": # spwr = "\x00" + "\x63" #elif pwr == "50": # spwr = "\x01" + "\x27" #elif pwr == "75": # spwr = "\x01" + "\x91" #elif pwr == "100": # spwr = "\x02" + "\x55" rigpwr = int(pwr) * 255 / 100 print "rigpwr ", rigpwr pwr1 = rigpwr / 100 pwr2 = rigpwr % 100 spwr1 = (pwr1 / 10 * 16) spwr2 = (pwr1 % 10) spwr3 = (pwr2 / 10 * 16) spwr4 = (pwr2 % 10) spwr = chr(spwr1+spwr2) + chr(spwr3+spwr4) #print "spwr ", spwr sendStr = preamble + preamble + self.radio_address + controller + pwr_cmd + spwr + eom result = self.tx_rx(sendStr) if result[4] == ack: return self.get_pwr() elif result[4] == nak: return "NAK received" def pre_on(self): sendStr = preamble + preamble + self.radio_address + controller + set_pre_cmd + set_pre_on + eom result = self.tx_rx(sendStr) if result[4] == ack: return "Success" elif result[4] == nak: return "NAK received" def pre_2_on(self): sendStr = preamble + preamble + self.radio_address + controller + set_pre_cmd + set_pre_2_on + eom result = self.tx_rx(sendStr) if result[4] == ack: return "Success" elif result[4] == nak: return "NAK received" def pre_off(self): sendStr = preamble + preamble + self.radio_address + controller + set_pre_cmd + set_pre_off + eom result = self.tx_rx(sendStr) if result[4] == ack: return "Success" elif result[4] == nak: return "NAK received" def ptt_on(self): sendStr = preamble + preamble + self.radio_address + controller + ptt_on_cmd + eom result = self.tx_rx(sendStr) #print result[5] if not result[4] == ack: return "ptt on" elif result[4] == nak: return "Error" def ptt_off(self): sendStr = preamble + preamble + self.radio_address + controller + ptt_off_cmd + eom result = self.tx_rx(sendStr) #print result[5] if not result[4] == ack: return "ptt off" elif result[4] == nak: return "Error" def get_att(self): sendStr = preamble + preamble + self.radio_address + controller + set_att_cmd + eom result = self.tx_rx(sendStr) if not result: return "0" if result[5] == "\x00": return 0 elif result[5] == "\x20": return 1 def att_on(self): sendStr = preamble + preamble + self.radio_address + controller + set_att_cmd + set_att_on + eom result = self.tx_rx(sendStr) if result[4] == ack: return "Success" elif result[4] == nak: return "NAK received" def att_off(self): sendStr = preamble + preamble + self.radio_address + controller + set_att_cmd + set_att_off + eom result = self.tx_rx(sendStr) if result[4] == ack: return "Success" elif result[4] == nak: return "NAK received" def set_freq(self, freq): fdig = "%010d" % int(freq * 1000) bcd = () for i in (8, 6, 4, 2, 0): bcd += self.freq_bcd(int(fdig[i]), int(fdig[i + 1])) set_freq_val = "" for byte in bcd: set_freq_val += chr(byte) sendStr = preamble + preamble + self.radio_address + controller + set_freq_cmd + set_freq_val + eom result = self.tx_rx(sendStr) if result[4] == ack: return "Set Freq success" elif result[4] == nak: return "NAK received / Freq not supported" def get_freq(self): sendStr = preamble + preamble + self.radio_address + controller + get_freq_cmd + eom result = self.tx_rx(sendStr) if not result: return "0" if len(result) > 0: f = 0 for k in [18, 19, 16, 17, 14, 15, 12, 13, 10, 11]: f = 10 * f + self.nib(result, k) self.freq = (float(f) / 1000) return "%.3f" % self.freq def set_mode(self, mode): print "in set_mode() with ", mode if mode == "data": mode = "rtty" if mode == "lsb": set_mode_val = "\x00" elif mode == "usb": set_mode_val = "\x01" elif mode == "am": set_mode_val = "\x02" elif mode == "cw": set_mode_val = "\x03" elif mode == "rtty": set_mode_val = "\x04" elif mode == "fm": set_mode_val = "\x05" elif mode == "cw-r": set_mode_val = "\x07" elif mode == "rtty-r": set_mode_val = "\x08" elif mode == "s-am": set_mode_val = "\x11" else: return "Mode not recognized" sendStr = preamble + preamble + self.radio_address + controller + set_mode_cmd + set_mode_val + eom result = self.tx_rx(sendStr) if result[4] == ack: return "Set Mode Success" elif result[4] == nak: return "NAK received / Mode not supported" def get_mode(self): sendStr = preamble + preamble + self.radio_address + controller + get_mode_cmd + eom result = self.tx_rx(sendStr) if not result: return "0" mode = "" if result[5] == "\x00": mode = "lsb" elif result[5] == "\x01": mode = "usb" elif result[5] == "\x02": mode = "am" elif result[5] == "\x03": mode = "cw" elif result[5] == "\x04": mode = "rtty" elif result[5] == "\x05": mode = "fm" elif result[5] == "\x08": mode = "rtty-r" elif result[5] == "\x07": mode = "cw-r" elif result[5] == "\x11": mode = "s-am" if mode == "rtty": mode = "data" self.mode = mode return self.mode.upper() def get_s(self): sendStr = preamble + preamble + self.radio_address + controller + get_smeter_cmd + eom result = self.tx_rx(sendStr) if not result: return "0" sm1 = ord(result[7]) / 16 sm2 = ord(result[7]) % 16 sm3 = ord(result[6]) / 16 sm4 = ord(result[6]) % 16 s = float(100 * (10 * sm3 + sm4) + (10 * sm1 + sm2)) return s def get_swr(self): sendStr = preamble + preamble + self.radio_address + controller + get_swr_cmd + eom result = self.tx_rx(sendStr) if not result: return "0" sm1 = ord(result[7]) / 16 sm2 = ord(result[7]) % 16 sm3 = ord(result[6]) / 16 sm4 = ord(result[6]) % 16 swr = float(100 * (10 * sm3 + sm4) + (10 * sm1 + sm2)) return swr def get_smeter(self): s = float(self.get_s()) cal = self.cal s1 = s - cal[0] s2 = s1 - cal[1] s3 = s2 - cal[2] s4 = s3 - cal[3] s5 = s4 - cal[4] s6 = s5 - cal[5] s7 = s6 - cal[6] if s1 <= 0: dbm = -123 adj = s / cal[0] * 10 return str(dbm + adj) elif s2 <= 0: dbm = -113 adj = s1 / cal[1] * 10 return str(dbm + adj) elif s3 <= 0: dbm = -103 adj = s2 / cal[2] * 10 return str(dbm + adj) elif s4 <= 0: dbm = -93 adj = s3 / cal[3] * 10 return str(dbm + adj) elif s5 <= 0: dbm = -83 adj = s4 / cal[4] * 10 return str(dbm + adj) elif s6 <= 0: dbm = -73 adj = s5 / cal[5] * 10 return str(dbm + adj) elif s7 <= 0: dbm = -63 adj = s6 / cal[6] * 20 return str(dbm + adj) else: dbm = -43 adj = s7 / cal[7] * 20 return str(dbm + adj) def get_name(self): return self.model def tune(self): print "tuning" curmode = self.get_mode().lower() #print "Current Mode ",curmode curpwr = self.get_pwr() if curpwr < 98: curpwr = curpwr + 1 #print "Current Power ", curpwr #print "Current percent power ", curpwr self.set_mode("rtty") self.set_pwr(25) #print "Tuning power ", self.get_pwr() #print "PTT On" self.ptt_on() time.sleep(2) swr = self.get_swr() #print "SWR :", swr time.sleep(1) self.ptt_off() #print "PTT Off" self.set_mode(curmode) #print "Mode reset ",self.get_mode() self.set_pwr(curpwr) print "Tuned : (ref pwr : %s)" % swr return "Tuned : (ref pwr : %s)" % swr def tx_rx(self, sendStr): lock.acquire() self.ser.write(sendStr) echo = self.ser.read(len(sendStr)) if len(echo) != len(sendStr): return "0" byte = "0" result = "" count = 0 while byte != eom: byte = self.ser.read() #print "%#02x" % ord(byte) result += byte count += 1 if count > 10: break lock.release() #print "" return result def nib(self, s, i): k = ord(s[i / 2]) if i % 2 == 0: k = k >> 4 return k & 0xf def freq_bcd(self, d1, d2): return (16 * d1 + d2),
AOR AR7030 module
- aor.py
import serial import threading #sport = "/dev/ttyS0" sport = "/dev/ttyUSB1" sbaud = 1200 lock = threading.Lock() fract = float(2 ** 24) / 44545 class Ar7030(object): def __init__(self, model): self.ser = serial.Serial(sport, sbaud, timeout=10) self.model = model def check_bit(self, byte, bit): mask = 1 << bit result = (ord(byte) & mask) if result: return 1 else: return 0 def set_bit(self, byte, bit): mask = 1 << bit result = ord(byte) | mask return result def clear_bit(self, byte, bit): mask = ~(1 << bit) result = (ord(byte) & mask) return result def get_ident(self): sendStr = '\x5f' + '\x40' ident = self.tx_rx(sendStr, False, 8) return ident def get_pre(self): sendStr = '\x50' + '\x32' + '\x48' byte = self.tx_rx(sendStr, False, 1) p = self.check_bit(byte, 4) if p: return 1 else: return 0 return def pre_on(self): #get current 8-bit rxcon byte : # bit0 = filter FS3 # bit1 = filter FS2 # bit2 = filter FS1 # bit3 = filter FS4 # bit4 = preamp enabled # bit5 = atten 0 = 20dB / 1 = 40dB # bit6 = input filter 0 = HF / 1 = LF # bit7 = attenuator enabled sendStr = '\x50' + '\x32' + '\x48' byte = self.tx_rx(sendStr, False, 1) # set bit 4 ON = preamp ON and get the new 8-bit rxcon byte pon = self.set_bit(byte, 4) # split new rxcon byte into two 4-bit nibbles, add 48/96 (\x30 and \x60) high = 48 + (pon >> 4) low = 96 + (pon & 15) sendStr = '\x81' + '\x50' + '\x32' + '\x48' + chr(high) + chr(low) + '\x29' + '\x80' self.tx_rx(sendStr, False, 0) return "Command sent" def pre_off(self): #get current 8-bit rxcon byte sendStr = '\x50' + '\x32' + '\x48' byte = self.tx_rx(sendStr, False, 1) # set bit 4 OFF = preamp OFF and get the new 8-bit rxcon byte pon = self.clear_bit(byte, 4) # split new rxcon byte into two 4-bit nibbles, add 48/96 (\x30 and \x60) high = 48 + (pon >> 4) low = 96 + (pon & 15) sendStr = '\x81' + '\x50' + '\x32' + '\x48' + chr(high) + chr(low) + '\x29' + '\x80' self.tx_rx(sendStr, False, 0) return "Command sent" def get_att(self): sendStr = '\x50' + '\x32' + '\x48' byte = self.tx_rx(sendStr, False, 1) a = self.check_bit(byte, 7) if a: return 1 else: return 0 def att_on(self): #get current 8-bit rxcon byte sendStr = '\x50' + '\x32' + '\x48' byte = self.tx_rx(sendStr, False, 1) # set bit 7 ON = ATT ON and get the new 8-bit rxcon byte pon = self.set_bit(byte, 7) # split new rxcon byte into two 4-bit nibbles, add 48/96 (\x30 and \x60) high = 48 + (pon >> 4) low = 96 + (pon & 15) sendStr = '\x81' + '\x50' + '\x32' + '\x48' + chr(high) + chr(low) + '\x29' + '\x80' self.tx_rx(sendStr, False, 0) return "Command sent" def att_off(self): #get current 8-bit rxcon byte sendStr = '\x50' + '\x32' + '\x48' byte = self.tx_rx(sendStr, False, 1) # set bit 7 OFF = att OFF and get the new 8-bit rxcon byte pon = self.clear_bit(byte, 7) # split new rxcon byte into two 4-bit nibbles, add 48/96 (\x30 and \x60) high = 48 + (pon >> 4) low = 96 + (pon & 15) sendStr = '\x81' + '\x50' + '\x32' + '\x48' + chr(high) + chr(low) + '\x29' + '\x80' self.tx_rx(sendStr, False, 0) return "Command sent" def set_freq(self, freq): fval = freq * fract #print fval b1 = 48 + int(fval / 1048576) fval = fval % 1048576 b2 = 96 + int(fval / 65536) fval = fval % 65536 b3 = 48 + int(fval / 4096) fval = fval % 4096 b4 = 96 + int(fval / 256) fval = fval % 256 b5 = 48 + int(fval / 16) b6 = 96 + int(fval % 16) f_tuple = ( b1, b2, b3, b4, b5, b6 ) freqStr = "" for byte in f_tuple: freqStr += chr(byte) sendStr = '\x81' + '\x50' + '\x31' + '\x4a' + freqStr + '\x24' + '\x80' self.tx_rx(sendStr, False, 0) return "Freq Set" def get_freq(self): sendStr = '\x50' + '\x31' + '\x4a' freqStr = self.tx_rx(sendStr, False, 3) f_val = 0 for k in freqStr: f_val = f_val * 256 + ord(k) return "%.3f" % round(float(f_val / fract),2 ) def set_mode(self, mode): if mode == "lsb": set_mode_val = "\x66" elif mode == "usb": set_mode_val = "\x67" elif mode == "am": set_mode_val = "\x61" elif mode == "cw": set_mode_val = "\x65" elif mode == "data": set_mode_val = "\x64" elif mode == "fm": set_mode_val = "\x63" elif mode == "s-am": set_mode_val = "\x62" else: return "Mode not recognized" sendStr = '\x81' + '\x50' + '\x31' + '\x4d' + set_mode_val + '\x22' + '\x80' self.tx_rx(sendStr, False, 0) return "Mode sent" def get_mode(self): sendStr = "\x50" + "\x31" + "\x4d" m = self.tx_rx(sendStr, False, 1) mode = "" if m == "\x01": mode = "am" elif m == "\x02": mode = "s-am" elif m == "\x03": mode = "fm" elif m == "\x04": mode = "data" elif m == "\x05": mode = "cw" elif m == "\x06": mode = "lsb" elif m == "\x07": mode = "usb" return mode.upper() def get_s(self): sendStr = '\x2e' sraw = self.tx_rx(sendStr, True, 0) return ord(sraw) def get_cal(self): sendStr = '\x52' + '\x3f' + '\x44' + '\x11' #print "sending getcal" cbytes = self.tx_rx(sendStr, False, 8) cal = [] for c in cbytes: cal.append(ord(c)) return cal def get_smeter(self): s = float(self.get_s()) cal = self.get_cal() s1 = s - cal[0] s2 = s1 - cal[1] s3 = s2 - cal[2] s4 = s3 - cal[3] s5 = s4 - cal[4] s6 = s5 - cal[5] s7 = s6 - cal[6] if s1 <= 0: dbm = -123 adj = s / cal[0] * 10 return str(dbm + adj) elif s2 <= 0: dbm = -113 adj = s1 / cal[1] * 10 return str(dbm + adj) elif s3 <= 0: dbm = -103 adj = s2 / cal[2] * 10 return str(dbm + adj) elif s4 <= 0: dbm = -93 adj = s3 / cal[3] * 10 return str(dbm + adj) elif s5 <= 0: dbm = -83 adj = s4 / cal[4] * 10 return str(dbm + adj) elif s6 <= 0: dbm = -73 adj = s5 / cal[5] * 10 return str(dbm + adj) elif s7 <= 0: dbm = -63 adj = s6 / cal[6] * 20 return str(dbm + adj) else: dbm = -43 adj = s7 / cal[7] * 20 return str(dbm + adj) def tx_rx(self, sendStr, reply, n): # apply thread lock lock.acquire() self.ser.write(sendStr) if reply: # for reading S-meter result = "" while not result: result = self.ser.read(1) lock.release() return result else: result = "" byte = "" while n != 0: self.ser.write('\x71') #byte = "" while not byte: byte = self.ser.read(1) result += byte byte = "" n -= 1 lock.release() return result
Icom IC-M710 module
- m710.0y
''' Python NMEA Radio Functions module for Icom IC-M710 Marine HF SSB Transceiver Copyright (C) 2015 John Pumford-Green This program is free software: you can redistribute it and/or modify it under the terms of the GNU General Public License as published by the Free Software Foundation, either version 3 of the License, or (at your option) any later version. This program is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for more details. You should have received a copy of the GNU General Public License along with this program. If not, see <http://www.gnu.org/licenses/>. ''' # serial port for Icom IC-M710 Rig Control import serial import threading import time version = "v0.1" sport = '/dev/ttyUSB0' sbaud = 4800 #ser = serial.Serial(sport, 4800, timeout=1) lock = threading.Lock() # NMEA Codes preamble = "$PICOA," controller = "90" radio = "01" cr = "\x0d" lf = "\x0a" # Commands are sent as NMEA private sentences: # # $PICOA,controller_id,radio_id,command,<parameter>,*HH<CR><LF> # # where HH is the 2 digit ECC value below: # # The protocol document states that for messages FROM the controller TO the radio # the ECC bytes are optional, and may be omitted. This appears to be false information, and # the ECC bytes seem to be necessary. # # The ECC checksum, is a two-digit hex value found by XORing the hex values of the characters # between "$" and "*" (but not including the $ or *) # # The first part of the message is always: # "PICOA,90,01," and this has an XOR value of (decimal) 112 # we then XOR this with each character's decimal ASCII value, in the required command # and convert the result to a 2-digit hex value # class m710(object): def __init__(self, model): self.ser = serial.Serial(sport, sbaud, timeout=0.1) self.model = model def get_ecc(self,command): ecc = 112 for c in command: ecc = ord(c) ^ ecc hecc = '{0:02x}'.format(int(ecc)) return hecc # we must force the radio into "Remote" mode before sending any other commands. def remote_on(self): command = "REMOTE,ON" ecc = self.get_ecc(command) sendStr = preamble+controller+","+radio+","+command+"*"+ecc+cr+lf result = self.tx_rx(sendStr) if result: return else: return self.remote_on() # we can leave the radio in "Remote" mode for as long as we want to control it remotely # but we must close the "Remote" mode when finished. The previous radio settings (channel/power etc) are # restored after Remote mode is closed. def remote_off(self): command = "REMOTE,OFF" ecc = self.get_ecc(command) sendStr = preamble+controller+","+radio+","+command+"*"+ecc+cr+lf result = self.tx_rx(sendStr) if result: #ser.close() return else: return self.remote_off() def ptt_on(): command = "TRX,TX" ecc = get_ecc(command) sendStr = preamble+controller+","+radio+","+command+"*"+ecc+cr+lf result = tx_rx(sendStr) if result: return else: return ptt_on() def ptt_off(): command = "TRX,RX" ecc = get_ecc(command) sendStr = preamble+controller+","+radio+","+command+"*"+ecc+cr+lf result = tx_rx(sendStr) if result: return else: return ptt_off() def get_freq(self): fkhz = self.get_rxfreq() #fkhz = "%.3f" % (float(f) * 1000) return fkhz def set_freq(self,freq): self.set_rxfreq(freq) self.set_txfreq(freq) return "Set freq success" def get_rxfreq(self): command = "RXF" ecc = self.get_ecc(command) sendStr = preamble+controller+","+radio+","+command+"*"+ecc+cr+lf result = self.tx_rx(sendStr) if result: list = result.split(",") f = list[4].split("*")[0] fkhz = "%.3f" % (float(f) * 1000) return fkhz else: return self.get_rxfreq() def get_txfreq(self): command = "TXF" ecc = get_ecc(command) sendStr = preamble+controller+","+radio+","+command+"*"+ecc+cr+lf result = self.tx_rx(sendStr) if result: list = result.split(",") f = list[4].split("*")[0] fkhz = "%.3f" % (float(f) * 1000) return fkhz else: return self.get_txfreq() def get_mode(self): command = "MODE" ecc = self.get_ecc(command) sendStr = preamble+controller+","+radio+","+command+"*"+ecc+cr+lf result = self.tx_rx(sendStr) if result: list = result.split(",") mode = list[4].split("*")[0] if mode == "J2B": mode = "DATA" return mode else: return self.get_mode() def set_mode(self,mode): mode = mode.upper() if mode == "DATA": mode = "J2B" command = "MODE,"+mode ecc = self.get_ecc(command) sendStr = preamble +controller+","+radio+"," + command + "*"+ecc+cr+lf #print "in set_mode with ", sendStr result = self.tx_rx(sendStr) if result: return result else: return self.set_mode(mode) def set_rxfreq(self,freq): fmhz = float(freq) / 1000 f = str(fmhz) command = "RXF,"+f ecc = self.get_ecc(command) sendStr = preamble +controller+","+radio+"," + command + "*"+ecc+cr+lf result = self.tx_rx(sendStr) if result: return result else: return self.set_rxfreq(freq) def set_txfreq(self,freq): fmhz = float(freq) / 1000 f = str(fmhz) command = "TXF,"+f ecc = self.get_ecc(command) sendStr = preamble +controller+","+radio+"," + command + "*"+ecc+cr+lf result = self.tx_rx(sendStr) if result: return result else: return self.set_txfreq(freq) def get_txpower(): command = "TXP" ecc = get_ecc(command) sendStr = preamble +controller+","+radio+"," + command + "*"+ecc+cr+lf result = tx_rx(sendStr) if result: list = result.split(",") power = list[4].split("*")[0] return power else: return get_txpower() def set_txpower(p): command = "TXP,"+p ecc = get_ecc(command) sendStr = preamble +controller+","+radio+"," + command + "*"+ecc+cr+lf result = tx_rx(sendStr) if result: return result else: return set_txpower(p) def get_smeter(self): command = "SIGM" ecc = self.get_ecc(command) sendStr = preamble+controller+","+radio+","+command+"*"+ecc+cr+lf result = self.tx_rx(sendStr) if result: list = result.split(",") smeter = (float(list[4].split("*")[0]) * 20) - 120 return smeter else: return self.get_smeter() def speaker_on(): command = "SP,ON" ecc = get_ecc(command) sendStr = preamble +controller+","+radio+"," + command + "*"+ecc+cr+lf result = tx_rx(sendStr) if result: return result else: return speaker_on() def speaker_off(): command = "SP,OFF" ecc = get_ecc(command) sendStr = preamble +controller+","+radio+"," + command + "*"+ecc+cr+lf result = tx_rx(sendStr) if result: return result else: return speaker_off() def sql_on(): command = "SQLC,ON" ecc = get_ecc(command) sendStr = preamble +controller+","+radio+"," + command + "*"+ecc+cr+lf result = tx_rx(sendStr) if result: return result else: return sql_on() def sql_off(): command = "SQLC,OFF" ecc = get_ecc(command) sendStr = preamble +controller+","+radio+"," + command + "*"+ecc+cr+lf result = tx_rx(sendStr) if result: return result else: return sql_off() def nb_on(): command = "NB,ON" ecc = get_ecc(command) sendStr = preamble +controller+","+radio+"," + command + "*"+ecc+cr+lf result = tx_rx(sendStr) if result: return result else: return nb_on() def nb_off(): command = "NB,OFF" ecc = get_ecc(command) sendStr = preamble +controller+","+radio+"," + command + "*"+ecc+cr+lf result = tx_rx(sendStr) if result: return result else: return nb_off() def dim_on(): command = "DIM,ON" ecc = get_ecc(command) sendStr = preamble +controller+","+radio+"," + command + "*"+ecc+cr+lf result = tx_rx(sendStr) if result: return result else: return dim_on() def dim_off(): command = "DIM,OFF" ecc = get_ecc(command) sendStr = preamble +controller+","+radio+"," + command + "*"+ecc+cr+lf result = tx_rx(sendStr) if result: return result else: return dim_off() def agc_on(): command = "AGC,ON" ecc = get_ecc(command) sendStr = preamble +controller+","+radio+"," + command + "*"+ecc+cr+lf result = tx_rx(sendStr) if result: return result else: return agc_on() def agc_off(): command = "AGC,OFF" ecc = get_ecc(command) sendStr = preamble +controller+","+radio+"," + command + "*"+ecc+cr+lf result = tx_rx(sendStr) if result: return result else: return agc_off() def get_vol(): command = "AFG" ecc = get_ecc(command) sendStr = preamble+controller+","+radio+","+command+"*"+ecc+cr+lf result = tx_rx(sendStr) if result: list = result.split(",") vol = list[4].split("*")[0] return vol else: return get_vol() def set_vol(v): command = "AFG,"+v ecc = get_ecc(command) sendStr = preamble +controller+","+radio+"," + command + "*"+ecc+cr+lf result = tx_rx(sendStr) if result: return result else: return set_vol(v) def get_rfg(): command = "RFG" ecc = get_ecc(command) sendStr = preamble+controller+","+radio+","+command+"*"+ecc+cr+lf result = tx_rx(sendStr) if result: list = result.split(",") rf = list[4].split("*")[0] return rf else: return get_rfg() def get_att(self): return def att_on(self): return def att_off(self): return def pre_on(self): return def pre_off(self): return def get_pre(self): return def set_rfg(v): command = "RFG,"+v ecc = get_ecc(command) sendStr = preamble +controller+","+radio+"," + command + "*"+ecc+cr+lf result = tx_rx(sendStr) if result: return result else: return set_rfg(v) # The message FROM the radio may be corrupted so we do a check on the received ECC versus our calculated cECC # from the received characters - which are everything AFTER the "$" and BEFORE the "*" # # starting the ECC calculation at message[1] ignores the "$" and we iterate through the message XORing each # character's ASCII (decimal) value until we have the final cECC which is converted by formatting it # into a two-digit hex value. This is then compared to the received (rECC) value from the radio's message. # If they match the message is acceptable and check_ecc() returns True to tx_rx(), # otherwise check_ecc() function returns "False" to tx_rx() # the True/False value is tested in tx_rx(). # If True then tx_rx() returns the incoming string to the calling function # otherwise it returns "False". # The calling function then checkS the boolean state of the returned value. # If True (ie it has the radio's message and the ECC was good) the message # is sent back to the client. # If False the function calls itself again, and attempts to get an error-free reply from the radio, via tx_rx() def check_ecc(self,message, recc): i = 1 cecc = 0 while i < len(message): cecc = ord(message[i]) ^ cecc i += 1 cecc = '{0:02x}'.format(int(cecc)) if cecc == recc: return True else: return False def tx_rx(self,sendStr): #print "in tx_rx, sendStr = " , sendStr lock.acquire() self.ser.write(sendStr) time.sleep(0.1) result = self.ser.readline() #print result reply = result.split("*") ecc = reply[1][0:2].lower() message = reply[0] lock.release() if self.check_ecc(message, ecc): return result else: return False #
Text-mode client
- client.py
__author__ = 'gm4slv' # client to work with server_oo.py # # # v0.1 import socket try: import readline except ImportError: pass import threading import time HOST, PORT = "localhost", 9999 smlog = "pymon.txt" log_active = [] def make_con(): global sock sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) sock.connect((HOST, PORT)) def get_rnum(): global num global radios names = connect("listradios") radios = names.split() num = len(radios) return num def get_rname(i): r = radios[i] return r def list_radios(): num = get_rnum() global radio print "\r\nThere are currently %d radios connected." % num for i in range(0, num): r = get_rname(i) print "Radio %d is %s" % (i + 1, r) def get_lradio(): num = get_rnum() lradio = raw_input("Select radio to log: ").strip() if not lradio: print "Please select a radio" return False elif int(lradio) > num: print "Selected radio not recognized" return False else: return lradio def set_radio(): num = get_rnum() # global radio global radio_num global rname print "There are currently %d radios connected." % num for i in range(0, num): r = get_rname(i) print "Radio %d is %s" % (i + 1, r) radio = raw_input("Choose a radio number from the list : ").strip() try: if not radio: print "Please select a radio" return False, False elif int(radio) > num: print "Selected radio not recognized" return False, False except ValueError: print "Please enter a number" return False, False else: radio_num = "".join(("r", radio)) rname = get_rname(int(radio) - 1) return radio_num, rname def prompt(): print "" print "The available commands are:" print "lr : List Radios" print "sr : Select the Radio to control" print "gr : Get currently selected Radio name" print "gm : Get Mode" print "sm : Set Mode" print "gf : Get Freq" print "sf : Set Freq" print "gs : Get S-meter" print "gp : Get Pre-amp" print "pon : Set Pre-amp On" print "p2on : Set Pre-amp 2 On" print "poff : Set Pre-amp Off" print "gatt : Get Attn" print "aton : Set Attn On" print "atoff: Set Attn Off" print "ga : Get All (status of all radios)" print "sync : Sync freq/mode on two radios" print "log : Setup background logging to file" print "h : Help (show this command list)" print "q : quit" print "" def start(): global radio_num global rname global sock pfreq = connect("getfreq" + " " + radio_num) pmode = connect("getmode" + " " + radio_num) data = raw_input(rname + " (" + pfreq + " " + pmode + ") " + " > ").lower().strip() if len(data.split()) > 1: if (data.split())[0] == "sf": sfreq = (data.split())[1] freq = connect("setfreq" + " " + sfreq + " " + radio_num) print "%s replied: %s" % (rname, freq) start() elif (data.split())[0] == "sm": smode = (data.split())[1] mode = connect("setmode" + " " + smode + " " + radio_num) print "%s replied: %s" % (rname, mode) start() else: print "only one command at a time please" start() elif data == "u": oldf = connect("getfreq" + " " + radio_num) newf = str(float(oldf) + 1) freq = connect("setfreq" + " " + newf + " " + radio_num) print "%s replied: %s" % (rname, freq) start() elif data == "d": oldf = connect("getfreq" + " " + radio_num) newf = str(float(oldf) - 1) freq = connect("setfreq" + " " + newf + " " + radio_num) print "%s replied: %s" % (rname, freq) start() elif data == "lr": list_radios() start() elif data == "sr": radio_num, rname = set_radio() while not radio_num: radio_num, rname = set_radio() start() elif data == "gr": print "Radio selected is %s" % rname start() elif data == "gm": mode = connect("getmode" + " " + radio_num) print "%s replied: %s" % (rname, mode) start() elif data == "sm": smode = raw_input("Enter mode: ") mode = connect("setmode" + " " + smode + " " + radio_num) print "%s replied: %s" % (rname, mode) start() elif data == "gf": freq = connect("getfreq" + " " + radio_num) print "%s replied: %s kHz" % (rname, freq) start() elif data == "sf": sfreq = raw_input("Enter freq (kHz): ") freq = connect("setfreq" + " " + sfreq + " " + radio_num) print "%s replied: %s" % (rname, freq) start() elif data == "gs": smeter = connect("getsmeter" + " " + radio_num) print "%s replied: %sdBm" % (rname, smeter) start() elif data == "gp": preamp = connect("getpreamp" + " " + radio_num) print "%s replied: %s" % (rname, preamp) start() elif data == "pon": preamp = connect("preampon" + " " + radio_num) print "%s replied: %s" % (rname, preamp) start() elif data == "p2on": preamp = connect("preamp2on" + " " + radio_num) print "%s replied: %s" % (rname, preamp) start() elif data == "poff": preamp = connect("preampoff" + " " + radio_num) print "%s replied: %s" % (rname, preamp) start() elif data == "gatt": preamp = connect("getatt" + " " + radio_num) print "%s replied: %s" % (rname, preamp) start() elif data == "aton": att = connect("atton" + " " + radio_num) print "%s replied: %s" % (rname, att) start() elif data == "atoff": att = connect("attoff" + " " + radio_num) print "%s replied: %s" % (rname, att) start() elif data == "ga": get_all() start() elif data == "log": fname = raw_input("Enter a filename (or \"Return\" for default) :") if fname == "": fname = smlog # check file is valid try: f = open(fname, 'a+') f.close() except IOError: print "File/path not valid" start() list_radios() lradio = get_lradio() while not lradio: lradio = get_lradio() rname = get_rname(int(lradio) - 1) if lradio in log_active: print "Logging already active on " + rname else: tlog = int(raw_input("Enter a polling interval (seconds) :")) p = threading.Thread(target=log, args=(lradio, tlog, fname)) p.setDaemon(True) p.start() log_active.append(lradio) start() elif data == "sync": sync_result = sync() print sync_result start() elif data == "h" or data == "help": prompt() start() elif data == "q" or data == "quit": rx = connect("quit") print "Server says: %s " % rx else: #prompt() start() def get_all(): num = get_rnum() global radio print "There are currently %d radios connected." % num print "=" * 33 for i in range(0, num): r = get_rname(i) n = "".join(("r", str(i + 1))) freq = connect("getfreq" + " " + n) mode = connect("getmode" + " " + str(n)) smeter = connect("getsmeter" + " " + str(n)) preamp = connect("getpreamp" + " " + str(n)) att = connect("getatt" + " " + str(n)) print "Status of Radio %d (%s) \r\n" % (i + 1, r) print "Frequency : %s kHz" % freq print "Mode: %s" % mode print "S-Meter: %sdBm" % smeter print "Preamp = %s" % preamp print "Attenuator = %s " % att print "=" * 33 print "" def log(p, t, f): print "\n\n" tlog = t sradio = get_rname(int(p) - 1) sr = "".join(("r", str(p))) while True: try: frequency = connect("getfreq" + " " + sr + "\n") smeter = connect("getsmeter" + " " + sr + "\n") mode = connect("getmode" + " " + sr + "\n") write_file(f, sradio, mode, frequency, smeter) time.sleep(tlog) finally: pass def get_mradio(): num = get_rnum() mradio = raw_input("Choose Master radio number from the list: ").strip() if not mradio: print "Please select a radio" return False elif int(mradio) > num: print "Selected radio not recognized" return False else: return mradio def get_sradio(): num = get_rnum() sradio = raw_input("Choose Slave radio number from the list: ").strip() if not sradio: print "Please select a radio" return False elif int(sradio) > num: print "Selected radio not recognized" return False else: return sradio def sync(): num = get_rnum() print "" print "Set SLAVE to the same Frequency and Mode as MASTER.\r\n" print "Currently connected radios are:" for i in range(0, num): r = get_rname(i) print "%d is %s" % (i + 1, r) mradio = get_mradio() while not mradio: mradio = get_mradio() sradio = get_sradio() while not sradio: sradio = get_sradio() sr = "".join(("r", sradio)) mr = "".join(("r", mradio)) mfreq = connect("getfreq" + " " + mr) mmode = connect("getmode" + " " + mr) sfreq = connect("setfreq" + " " + mfreq + " " + sr) smode = connect("setmode" + " " + mmode + " " + sr) return (sfreq + "\r\n" + smode + "\r\n") # Try to send and receive in one-go, to prevent the logging thread and the main prog # getting the wrong receive data def connect(data): try: lock.acquire() global sock sock.sendall(data + "\n") received = sock.recv(2048) finally: lock.release() return received def write_file(fname, rname, mode, freq, smeter): filename = fname f = open(filename, 'a+') timenow = time.strftime("%d/%m/%Y %H:%M:%S", time.gmtime(time.time())) log = " ".join((timenow, rname, mode, freq, smeter, "\r\n")) f.write(log) f.close() make_con() lock = threading.Lock() get_all() print "Please choose a radio\r\n" radio_num, rname = set_radio() while not radio_num: radio_num, rname = set_radio() prompt() start()
Example session
There are currently 3 radios connected. ================================= Status of Radio 1 (IC-R75) Frequency : 2187.500 kHz Mode: DATA S-Meter: -114.2dBm Preamp = 0 Attenuator = 0 ================================= Status of Radio 2 (IC-M710) Frequency : 16804.500 kHz Mode: DATA S-Meter: -120.0dBm Preamp = None Attenuator = None ================================= Status of Radio 3 (AR7030) Frequency : 2182.000 kHz Mode: USB S-Meter: -110.3dBm Preamp = 0 Attenuator = 0 ================================= Please choose a radio There are currently 3 radios connected. Radio 1 is IC-R75 Radio 2 is IC-M710 Radio 3 is AR7030 Choose a radio number from the list : 3 The available commands are: lr : List Radios sr : Select the Radio to control gr : Get currently selected Radio name gm : Get Mode sm : Set Mode gf : Get Freq sf : Set Freq gs : Get S-meter gp : Get Pre-amp pon : Set Pre-amp On p2on : Set Pre-amp 2 On poff : Set Pre-amp Off gatt : Get Attn aton : Set Attn On atoff: Set Attn Off ga : Get All (status of all radios) sync : Sync freq/mode on two radios log : Setup background logging to file h : Help (show this command list) q : quit AR7030 (2182.000 USB) > sf 5680 AR7030 replied: Freq Set AR7030 (5680.000 USB) >
GUI Client
No S-meter version
- gui_nosmeter.py
# # new class-based gui from Tkinter import * import socket import threading import time class Network(object): def __init__(self): self.sock = self.make_con() def make_con(self): self.sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) self.sock.connect(("192.168.21.107", 9999)) return self.sock def connect(self, data): try: lock.acquire() self.sock.sendall(data + "\n") self.received = self.sock.recv(2048) finally: lock.release() return self.received class Dash(object): def __init__(self, master): self.master = master self.s = (n1.sock.getsockname()) self.p = (n1.sock.getpeername()) dash_frame = Toplevel(master, borderwidth = 2, relief = GROOVE, bg = 'black') dash_frame.title("Server") dash_frame.protocol("WM_DELETE_WINDOW", self.handler) dash_frame.resizable(0, 0) dash_frame.geometry("300x150+10-10") dash_frame.grid() #self.utc_time = StringVar() #Label(dash_frame, textvariable = self.utc_time, fg = 'green', bg = 'black').grid(row = 0, column = 0, sticky = W) Label(dash_frame, text = self.s, bg = 'black', fg = 'white').grid(row = 1, column = 0, sticky = W) Label(dash_frame, text = " < --- > ", bg = 'black', fg = 'white').grid(row = 1, column = 1) Label(dash_frame, text = self.p, bg = 'black', fg = 'white').grid(row = 1, column = 2, sticky = E) self.server_msg_l = StringVar() Label(dash_frame, textvariable = self.server_msg_l, fg = 'yellow', bg = 'black').grid(row = 2, column = 0, columnspan = 3, sticky = W) q_button = Button(dash_frame, text = "Quit", command = lambda: close()) q_button.grid(row = 3, column = 0, sticky = W) def handler(self): pass def up_dash(self): self.server_msg = n1.connect("ident") self.server_msg_l.set(self.server_msg) #self.utc_time.set(time.strftime("%d/%m/%Y %H:%M", time.gmtime(time.time()))) return class nRadio(object): def __init__(self, master, radio): self.master = master self.radio = radio self.num = "".join(("r", self.radio)) radio_frame = Frame(master, borderwidth=2, relief=GROOVE) radio_frame.grid() self.smeter_list = [] Label(radio_frame, text="Name", width=10).grid(row=0, column=0) Label(radio_frame, text="Freq/kHz", width=10).grid(row=0, column=1) Label(radio_frame, text="Mode", width=10).grid(row=0, column=2) #Label(radio_frame, text="Signal/dBm", width=10).grid(row=0, column=3) #Label(radio_frame, text="Max/dBm", width=10, fg='red').grid(row=0, column=4) #Label(radio_frame, text="Ave/dBm", width=10, fg='green').grid(row=0, column=5) #Label(radio_frame, text="Min/dBm", width=10, fg='blue').grid(row=0, column=6) self.name_l = StringVar() self.l_name = Label(radio_frame, textvariable=self.name_l, width=10) self.l_name.grid(row=1, column=0, sticky=W) self.freq_l = StringVar() self.l_freq = Label(radio_frame, fg='red', textvariable=self.freq_l, width=10) self.l_freq.grid(row=1, column=1, sticky=E) self.mode_l = StringVar() self.l_mode = Label(radio_frame, textvariable=self.mode_l, width=10) self.l_mode.grid(row=1, column=2, sticky=E) ''' self.e_mode = Entry(radio_frame, width=10, bg = 'white', fg = 'black', insertbackground = 'blue') self.e_mode.grid(row=2, column=2) self.e_mode.bind('<Return>', (lambda event: self.set_mode(self.num))) ''' #self.e_mode = StringVar() self.b_mode_usb = Button(radio_frame, text = "USB", width = 4, command = lambda: self.set_mode("usb", self.num)) self.b_mode_usb.grid(row = 3, column = 0) self.b_mode_lsb = Button(radio_frame, text = "LSB", width = 4, command = lambda: self.set_mode("lsb", self.num)) self.b_mode_lsb.grid(row = 3, column = 1) self.b_mode_cw = Button(radio_frame, text = "CW", width = 4, command = lambda: self.set_mode("cw", self.num)) self.b_mode_cw.grid(row = 4, column = 0) self.b_mode_am = Button(radio_frame, text = "AM", width = 4, command = lambda: self.set_mode("am", self.num)) self.b_mode_am.grid(row = 4, column = 1) self.b_mode_data = Button(radio_frame, text = "Data", width = 4, command = lambda: self.set_mode("data", self.num)) self.b_mode_data.grid(row = 3, column = 2) self.b_qsy_up = Button(radio_frame, text = "+1kHz", width = 6, command = lambda: self.qsy_up( self.num)) self.b_qsy_up.grid(row = 3, column = 3) self.b_qsy_down = Button(radio_frame, text = "-1kHz", width = 6, command = lambda: self.qsy_down( self.num)) self.b_qsy_down.grid(row = 4, column = 3) self.b_update = Button(radio_frame, text = "Get", width = 6, command = lambda:self.get_all()) self.b_update.grid(row = 0, column = 3) self.smeter_l = StringVar() self.l_smeter = Label(radio_frame, textvariable=self.smeter_l, width=10) #self.l_smeter.grid(row=1, column=3, sticky=E) self.max_var = StringVar() self.l_max = Label(radio_frame, textvariable=self.max_var, width=10) #self.l_max.grid(row=1, column=4) self.ave_var = StringVar() self.l_average = Label(radio_frame, textvariable=self.ave_var, width=10) #self.l_average.grid(row=1, column=5) self.min_var = StringVar() self.l_min = Label(radio_frame, textvariable=self.min_var, width=10) #self.l_min.grid(row=1, column=6) self.e_freq = Entry(radio_frame, width=10, bg = 'white', fg = 'black', insertbackground = 'blue') self.e_freq.grid(row=2, column=1) self.e_freq.focus() self.e_freq.bind('<Return>', (lambda event: self.set_freq(self.num))) self.pre = IntVar() self.cb_pre = Checkbutton(radio_frame, variable=self.pre, command=lambda: self.preamp_onoff(self.num)) self.cb_pre.grid(row=5, column=3, sticky=E) self.l_pre = Label(radio_frame, width=10, text="Preamp") self.l_pre.grid(row=5, column=2, sticky=W) self.att = IntVar() self.cb_att = Checkbutton(radio_frame, variable=self.att, command=lambda: self.att_onoff(self.num)) self.cb_att.grid(row=6, column=3, sticky=E) self.l_att = Label(radio_frame, width=10, text="Att") self.l_att.grid(row=6, column=2, sticky=W) self.set_log = IntVar() self.cb_log = Checkbutton(radio_frame, variable = self.set_log) #self.cb_log.grid(row = 6, column = 1, sticky = W) self.l_log = Label(radio_frame, width = 10, text = "Log to file...") #self.l_log.grid(row = 6, column = 0, sticky = E) self.c1 = Canvas(radio_frame, width=260, height=100, bg='black') #self.c1.grid(row=2, column=3, columnspan=4, rowspan=3) name = self.get_name(radio) def get_all(self): self.get_freq(self.num) self.get_mode(self.num) #self.get_smeter(self.num) self.get_preamp(self.num) self.get_atten(self.num) #self.graph_points() #self.avg() #self.max() #self.min() if self.set_log.get() == 1: if int(time.time()) % 10.0 == 0: self.write_file() return def get_freq(self, radio): self.freq = n1.connect("getfreq" + " " + radio) self.freq_l.set(self.freq) return def set_freq(self, radio): self.freq = str(self.e_freq.get()) self.newfreq = n1.connect("setfreq" + " " + self.freq + " " + radio) self.e_freq.delete(0, END) self.get_all() return def qsy_up(self,radio): oldf = n1.connect("getfreq" + " " + radio) newf = str(float(oldf) + 1) freq = n1.connect("setfreq" + " " + newf + " " + radio) self.get_all() return def qsy_down(self,radio): oldf = n1.connect("getfreq" + " " + radio) newf = str(float(oldf) - 1) freq = n1.connect("setfreq" + " " + newf + " " + radio) self.get_all() return def get_mode(self, radio): self.mode = n1.connect("getmode" + " " + radio) self.mode_l.set(self.mode) return def set_mode(self, mode, radio): #self.mode = str(self.e_mode.get()) self.newmode = n1.connect("setmode" + " " + mode + " " + radio) #self.e_mode.delete(0, END) self.get_all() return def get_smeter(self, radio): self.smeter = n1.connect("getsmeter" + " " + radio) self.smeter_l.set(self.smeter) self.smeter_list.append(float(self.smeter)) if len(self.smeter_list) > 120: self.smeter_list.pop(0) return def avg(self): s = self.smeter_list total = 0 for i in s: total += i self.av = round((total / len(s)), 1) self.ave_var.set(str(self.av)) self.c1.create_line(0, 100 - (self.av + 123 + 5), 310, 100 - (self.av + 123 + 5), fill="green") return self.av def min(self): s = self.smeter_list self.mn = s[0] for i in s: if i < self.mn: self.mn = i self.min_var.set(str(self.mn)) self.c1.create_line(0, 100 - (self.mn + 123 + 5), 310, 100 - (self.mn + 123 + 5), fill="blue") return self.mn def max(self): s = self.smeter_list self.mx = s[0] for i in s: if i > self.mx: self.mx = i self.max_var.set(str(self.mx)) self.c1.create_line(0, 100 - (self.mx + 123 + 5), 310, 100 - (self.mx + 123 + 5), fill="red") return self.mx def get_preamp(self, radio): self.preamp = n1.connect("getpreamp" + " " + radio) if self.preamp == "1": self.cb_pre.select() elif self.preamp == "0": self.cb_pre.deselect() return def preamp_onoff(self, radio): self.prestate = self.pre.get() if self.prestate: n1.connect("preampon" + " " + radio) else: n1.connect("preampoff" + " " + radio) self.get_all() return def att_onoff(self, radio): self.attstate = self.att.get() if self.attstate: n1.connect("atton" + " " + radio) else: n1.connect("attoff" + " " + radio) self.get_all() return def get_atten(self, radio): self.atten = n1.connect("getatt" + " " + radio) if self.atten == "1": self.cb_att.select() elif self.atten == "0": self.cb_att.deselect() return def get_name(self, radio): print radio self.all_names = n1.connect("listradios") self.radios = self.all_names.split() print self.radios self.name = self.radios[int(radio) - 1] self.name_l.set(self.name) return def graph_points(self): seq = self.smeter_list y_stretch = 1 y_gap = 5 x_stretch = 0 x_width = 2 x_gap = 2 height = 100 self.c1.delete(ALL) self.c1.create_line(0, 100 - (-73 + 123 + 5), 310, 100 - (-73 + 123 + 5), fill='white') for x, y in enumerate(seq): yd = y + 123 x0 = x * x_stretch + x * x_width + x_gap y0 = height - (yd * y_stretch + y_gap) x1 = x * x_stretch + x * x_width + x_width + x_gap y1 = height - y_gap self.c1.create_rectangle(x0, y0, x1, y1, fill="yellow") def write_file(self): self.filename = self.name+".txt" self.f = open(self.filename, 'a+') timenow = time.strftime("%d/%m/%Y %H:%M:%S", time.gmtime(time.time())) log = " ".join((timenow, self.name, self.mode, self.freq, self.smeter, self.preamp, self.atten, "\r\n")) self.f.write(log) self.f.close() def update(): #loops = 0 try: nRadio1.get_all() except: pass try: nRadio2.get_all() except: pass try: nRadio3.get_all() except: pass try: nRadio4.get_all() except: pass d1.up_dash() return def main(): #loops = 0 while True: try: nRadio1.get_all() except: pass try: nRadio2.get_all() except: pass try: nRadio3.get_all() except: pass try: nRadio4.get_all() except: pass d1.up_dash() #loops += 1 #print threading.currentThread().name, loops time.sleep(5) def close(): n1.connect("quit") root.destroy() if __name__ == "__main__": version = "v0.4" lock = threading.Lock() root = Tk() #w, h = root.winfo_screenwidth(), root.winfo_screenheight() #root.geometry("%dx%d+0+0" % (w, h)) #root.geometry("300x520+0+0") root.title("Netradio " + version) #root.withdraw() n1 = Network() #radio_count = (n1.connect("count")) #radio_count = 3 radio_count = int(n1.connect("count")) if radio_count > 0: nRadio1 = nRadio(root, "1") if radio_count > 1: nRadio2 = nRadio(root, "2") if radio_count > 2: nRadio3 = nRadio(root, "3") if radio_count > 3: nRadio4 = nRadio(root, "4") d1 = Dash(root) #print threading.currentThread().name #m1 = threading.Thread(target = main) #m1.setDaemon(True) #m1.start() update() root.mainloop()
S-meter display version
- gui.py
# # new class-based gui from Tkinter import * import socket import threading import time class Network(object): def __init__(self): self.sock = self.make_con() def make_con(self): self.sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) self.sock.connect(("192.168.21.107", 9999)) return self.sock def connect(self, data): try: lock.acquire() self.sock.sendall(data + "\n") self.received = self.sock.recv(2048) finally: lock.release() return self.received class Dash(object): def __init__(self, master): self.master = master self.s = (n1.sock.getsockname()) self.p = (n1.sock.getpeername()) dash_frame = Toplevel(master, borderwidth = 2, relief = GROOVE, bg = 'black') dash_frame.title("Server") dash_frame.protocol("WM_DELETE_WINDOW", self.handler) dash_frame.resizable(0, 0) dash_frame.geometry("300x150-10+10") dash_frame.grid() self.utc_time = StringVar() Label(dash_frame, textvariable = self.utc_time, fg = 'green', bg = 'black').grid(row = 0, column = 0, sticky = W) Label(dash_frame, text = self.s, bg = 'black', fg = 'white').grid(row = 1, column = 0, sticky = W) Label(dash_frame, text = " < --- > ", bg = 'black', fg = 'white').grid(row = 1, column = 1) Label(dash_frame, text = self.p, bg = 'black', fg = 'white').grid(row = 1, column = 2, sticky = E) self.server_msg_l = StringVar() Label(dash_frame, textvariable = self.server_msg_l, fg = 'yellow', bg = 'black').grid(row = 2, column = 0, columnspan = 3, sticky = W) q_button = Button(dash_frame, text = "Quit", command = lambda: close()) q_button.grid(row = 3, column = 0, sticky = W) def handler(self): pass def up_dash(self): self.server_msg = n1.connect("ident") self.server_msg_l.set(self.server_msg) self.utc_time.set(time.strftime("%d/%m/%Y %H:%M", time.gmtime(time.time()))) return class nRadio(object): def __init__(self, master, radio): self.master = master self.radio = radio self.num = "".join(("r", self.radio)) radio_frame = Frame(master, borderwidth=2, relief=GROOVE) radio_frame.grid() self.smeter_list = [] Label(radio_frame, text="Name", width=10).grid(row=0, column=0) Label(radio_frame, text="Freq/kHz", width=10).grid(row=0, column=1) Label(radio_frame, text="Mode", width=10).grid(row=0, column=2) Label(radio_frame, text="Signal/dBm", width=10).grid(row=0, column=3) Label(radio_frame, text="Max/dBm", width=10, fg='red').grid(row=0, column=4) Label(radio_frame, text="Ave/dBm", width=10, fg='green').grid(row=0, column=5) Label(radio_frame, text="Min/dBm", width=10, fg='blue').grid(row=0, column=6) self.name_l = StringVar() self.l_name = Label(radio_frame, textvariable=self.name_l, width=10) self.l_name.grid(row=1, column=0, sticky=W) self.freq_l = StringVar() self.l_freq = Label(radio_frame, fg='red', textvariable=self.freq_l, width=10) self.l_freq.grid(row=1, column=1, sticky=E) self.mode_l = StringVar() self.l_mode = Label(radio_frame, textvariable=self.mode_l, width=10) self.l_mode.grid(row=1, column=2, sticky=E) ''' self.e_mode = Entry(radio_frame, width=10, bg = 'white', fg = 'black', insertbackground = 'blue') self.e_mode.grid(row=2, column=2) self.e_mode.bind('<Return>', (lambda event: self.set_mode(self.num))) ''' #self.e_mode = StringVar() self.b_mode_usb = Button(radio_frame, text = "USB", width = 4, command = lambda: self.set_mode("usb", self.num)) self.b_mode_usb.grid(row = 3, column = 0) self.b_mode_lsb = Button(radio_frame, text = "LSB", width = 4, command = lambda: self.set_mode("lsb", self.num)) self.b_mode_lsb.grid(row = 3, column = 1) self.b_mode_cw = Button(radio_frame, text = "CW", width = 4, command = lambda: self.set_mode("cw", self.num)) self.b_mode_cw.grid(row = 4, column = 0) self.b_mode_am = Button(radio_frame, text = "AM", width = 4, command = lambda: self.set_mode("am", self.num)) self.b_mode_am.grid(row = 4, column = 1) self.b_mode_data = Button(radio_frame, text = "Data", width = 4, command = lambda: self.set_mode("data", self.num)) self.b_mode_data.grid(row = 3, column = 2) self.smeter_l = StringVar() self.l_smeter = Label(radio_frame, textvariable=self.smeter_l, width=10) self.l_smeter.grid(row=1, column=3, sticky=E) self.max_var = StringVar() self.l_max = Label(radio_frame, textvariable=self.max_var, width=10) self.l_max.grid(row=1, column=4) self.ave_var = StringVar() self.l_average = Label(radio_frame, textvariable=self.ave_var, width=10) self.l_average.grid(row=1, column=5) self.min_var = StringVar() self.l_min = Label(radio_frame, textvariable=self.min_var, width=10) self.l_min.grid(row=1, column=6) self.e_freq = Entry(radio_frame, width=10, bg = 'white', fg = 'black', insertbackground = 'blue') self.e_freq.grid(row=2, column=1) self.e_freq.focus() self.e_freq.bind('<Return>', (lambda event: self.set_freq(self.num))) self.pre = IntVar() self.cb_pre = Checkbutton(radio_frame, variable=self.pre, command=lambda: self.preamp_onoff(self.num)) self.cb_pre.grid(row=5, column=4, sticky=E) self.l_pre = Label(radio_frame, width=10, text="Preamp") self.l_pre.grid(row=5, column=3, sticky=W) self.att = IntVar() self.cb_att = Checkbutton(radio_frame, variable=self.att, command=lambda: self.att_onoff(self.num)) self.cb_att.grid(row=6, column=4, sticky=E) self.l_att = Label(radio_frame, width=10, text="Att") self.l_att.grid(row=6, column=3, sticky=W) self.set_log = IntVar() self.cb_log = Checkbutton(radio_frame, variable = self.set_log) self.cb_log.grid(row = 6, column = 1, sticky = W) self.l_log = Label(radio_frame, width = 10, text = "Log to file...") self.l_log.grid(row = 6, column = 0, sticky = E) self.c1 = Canvas(radio_frame, width=260, height=100, bg='black') self.c1.grid(row=2, column=3, columnspan=4, rowspan=3) name = self.get_name(radio) def get_all(self): self.get_freq(self.num) self.get_mode(self.num) self.get_smeter(self.num) self.get_preamp(self.num) self.get_atten(self.num) self.graph_points() self.avg() self.max() self.min() if self.set_log.get() == 1: if int(time.time()) % 10.0 == 0: self.write_file() return def get_freq(self, radio): self.freq = n1.connect("getfreq" + " " + radio) self.freq_l.set(self.freq) return def set_freq(self, radio): self.freq = str(self.e_freq.get()) self.newfreq = n1.connect("setfreq" + " " + self.freq + " " + radio) self.e_freq.delete(0, END) return def get_mode(self, radio): self.mode = n1.connect("getmode" + " " + radio) self.mode_l.set(self.mode) return def set_mode(self, mode, radio): #self.mode = str(self.e_mode.get()) self.newmode = n1.connect("setmode" + " " + mode + " " + radio) #self.e_mode.delete(0, END) return def get_smeter(self, radio): self.smeter = n1.connect("getsmeter" + " " + radio) self.smeter_l.set(self.smeter) self.smeter_list.append(float(self.smeter)) if len(self.smeter_list) > 120: self.smeter_list.pop(0) return def avg(self): s = self.smeter_list total = 0 for i in s: total += i self.av = round((total / len(s)), 1) self.ave_var.set(str(self.av)) self.c1.create_line(0, 100 - (self.av + 123 + 5), 310, 100 - (self.av + 123 + 5), fill="green") return self.av def min(self): s = self.smeter_list self.mn = s[0] for i in s: if i < self.mn: self.mn = i self.min_var.set(str(self.mn)) self.c1.create_line(0, 100 - (self.mn + 123 + 5), 310, 100 - (self.mn + 123 + 5), fill="blue") return self.mn def max(self): s = self.smeter_list self.mx = s[0] for i in s: if i > self.mx: self.mx = i self.max_var.set(str(self.mx)) self.c1.create_line(0, 100 - (self.mx + 123 + 5), 310, 100 - (self.mx + 123 + 5), fill="red") return self.mx def get_preamp(self, radio): self.preamp = n1.connect("getpreamp" + " " + radio) if self.preamp == "1": self.cb_pre.select() elif self.preamp == "0": self.cb_pre.deselect() return def preamp_onoff(self, radio): self.prestate = self.pre.get() if self.prestate: n1.connect("preampon" + " " + radio) else: n1.connect("preampoff" + " " + radio) return def att_onoff(self, radio): self.attstate = self.att.get() if self.attstate: n1.connect("atton" + " " + radio) else: n1.connect("attoff" + " " + radio) return def get_atten(self, radio): self.atten = n1.connect("getatt" + " " + radio) if self.atten == "1": self.cb_att.select() elif self.atten == "0": self.cb_att.deselect() return def get_name(self, radio): print radio self.all_names = n1.connect("listradios") self.radios = self.all_names.split() print self.radios self.name = self.radios[int(radio) - 1] self.name_l.set(self.name) return def graph_points(self): seq = self.smeter_list y_stretch = 1 y_gap = 5 x_stretch = 0 x_width = 2 x_gap = 2 height = 100 self.c1.delete(ALL) self.c1.create_line(0, 100 - (-73 + 123 + 5), 310, 100 - (-73 + 123 + 5), fill='white') for x, y in enumerate(seq): yd = y + 123 x0 = x * x_stretch + x * x_width + x_gap y0 = height - (yd * y_stretch + y_gap) x1 = x * x_stretch + x * x_width + x_width + x_gap y1 = height - y_gap self.c1.create_rectangle(x0, y0, x1, y1, fill="yellow") def write_file(self): self.filename = self.name+".txt" self.f = open(self.filename, 'a+') timenow = time.strftime("%d/%m/%Y %H:%M:%S", time.gmtime(time.time())) log = " ".join((timenow, self.name, self.mode, self.freq, self.smeter, self.preamp, self.atten, "\r\n")) self.f.write(log) self.f.close() def main(): #loops = 0 while True: try: nRadio1.get_all() except: pass try: nRadio2.get_all() except: pass try: nRadio3.get_all() except: pass try: nRadio4.get_all() except: pass d1.up_dash() #loops += 1 #print threading.currentThread().name, loops time.sleep(1) def close(): n1.connect("quit") root.destroy() if __name__ == "__main__": version = "v0.3" lock = threading.Lock() root = Tk() #w, h = root.winfo_screenwidth(), root.winfo_screenheight() #root.geometry("%dx%d+0+0" % (w, h)) root.geometry("550x600+0+0") root.title("GM4SLV Radio Controller " + version) #root.withdraw() n1 = Network() #radio_count = (n1.connect("count")) #radio_count = 3 radio_count = int(n1.connect("count")) if radio_count > 0: nRadio1 = nRadio(root, "1") if radio_count > 1: nRadio2 = nRadio(root, "2") if radio_count > 2: nRadio3 = nRadio(root, "3") if radio_count > 3: nRadio4 = nRadio(root, "4") d1 = Dash(root) #print threading.currentThread().name m1 = threading.Thread(target = main) m1.setDaemon(True) m1.start() root.mainloop()
Further Information
public/radio/dormant/netradio_control.txt · Last modified: 06/03/25 06:49 GMT by 127.0.0.1