#!/usr/bin/env python
import serial
import logging
import threading
import time
import curses
import Queue
g_exit_threads = False
class SerialPortThread (threading.Thread):
def __init__ (self, port, logger, queue):
threading.Thread.__init__ (self)
self._port = port
self._logger = logger
self._queue = queue
def run (self):
global g_exit_threads
self._logger.info ("SerialPortThread.run. ");
while (not g_exit_threads):
#line = self._serial_port.readline ()
#print line,
c = self._port.read ()
if (len(c) > 0):
self._queue.put (c)
self._logger.info ("SerialPortThread. Exiting ");
class KeyboardThread (threading.Thread):
def __init__ (self, screen, logger, port):
threading.Thread.__init__ (self)
self._screen = screen
self._logger = logger
self._port = port
def run (self):
global g_exit_threads
self._logger.info ("KeyboardThread.Run")
status = 0
while (not g_exit_threads):
key = self._screen.getkey ()
self._logger.info ("KeyboardThread.run. Event: >>" + key + "<<")
self._port.write (key)
if status == 0:
if ord(key) == 27:
status = 1
elif status == 1:
if ord(key) == 79:
status = 2
status = 0
elif status == 2:
if ord(key) == 80:
self._logger.info ("KeyboardThread. Exiting")
g_exit_threads = True
status = 0
status = 0
time.sleep (0.1);
class OutputThread (threading.Thread):
def __init__ (self, screen, logger, queue):
threading.Thread.__init__ (self)
self._screen = screen
self._logger = logger
self._queue = queue
def run (self):
global g_exit_threads
self._logger.info ("OutputThread.Run")
y, x = self._screen.getmaxyx ()
pos = 2
while (not g_exit_threads):
e = self._queue.get (timeout=0.1)
self._logger.info ("OutputThread: " + str(ord(e[0])) )
if e == '\n':
pos = 1
self._screen.scroll ()
self._logger.info ("Char: " + str(ord(e)))
if (ord(e) == 8):
if pos > 2:
pos -= 1
self._screen.addstr (y-1, pos, ' ')
elif (ord(e) == 7):
curses.flash ()
self._screen.addstr (y-1, pos, e)
pos +=1
self._logger.info ("OutputThread. Exiting")
if __name__ == "__main__":
logging.basicConfig(format='%(asctime)-6s: %(name)s - %(levelname)s - %(message)s', level=logging.DEBUG,
queue = Queue.Queue ()
screen = curses.initscr ()
curses.noecho ()
curses.curs_set (0)
curses.raw ()
screen.keypad (0)
screen.clear ()
screen.nodelay (True)
y, x = screen.getmaxyx ()
screen.setscrreg (1, y-1)
screen.scrollok (True)
port = serial.Serial (serial_port, 115200, timeout= 0.1);
threads = []
thread1 = SerialPortThread (port, logging, queue)
threads.append (thread1)
thread2 = KeyboardThread (screen, logging, port)
threads.append (thread2)
thread3 = OutputThread (screen, logging, queue)
threads.append (thread3)
for t in threads:
t.setDaemon (True)
t.start ()
for t in threads:
t.join ()
# Wait for all the thread to finish
#while threading.active_count () > 0:
# time.sleep (0.1)
curses.endwin ()
domingo, 11 de agosto de 2013
A serial terminal in Python using curses
I have written this small program to interact with a device using Python. It's basically a terminal that has been written with python and curses. What is the need to write another terminal program? In my case, to have it extended. It'll be extended to parse the output and control other systems.
Here is the code:
Suscribirse a:
Enviar comentarios (Atom)
No hay comentarios:
Publicar un comentario