domingo, 11 de agosto de 2013

A serial terminal in Python using curses

I have written this small program to interact with a device using Python. It's basically a terminal that has been written with python and curses. What is the need to write another terminal program? In my case, to have it extended. It'll be extended to parse the output and control other systems. Here is the code:


#!/usr/bin/env python

import serial
import logging 
import threading
import time
import curses
import Queue

serial_port='/dev/ttyUSB0'
g_exit_threads = False

class SerialPortThread (threading.Thread):
    def __init__ (self, port, logger, queue):
        threading.Thread.__init__ (self)
        self._port = port
        self._logger = logger
        self._queue = queue

    def run (self):
        global g_exit_threads
        self._logger.info ("SerialPortThread.run. ");
        while (not g_exit_threads):
            #line = self._serial_port.readline ()
            #print line,
            c = self._port.read ()
            if (len(c) > 0):
                self._queue.put (c)
        self._logger.info ("SerialPortThread. Exiting ");
            
            
        
class KeyboardThread (threading.Thread):
    def __init__ (self, screen, logger, port):
        threading.Thread.__init__ (self)
        self._screen = screen
        self._logger = logger
        self._port = port

    def run (self):
        global g_exit_threads
        self._logger.info ("KeyboardThread.Run")
        status = 0
        while (not g_exit_threads):
            try: 
                key = self._screen.getkey ()
                self._logger.info ("KeyboardThread.run. Event: >>" + key + "<<")
                self._port.write (key) 
                if status == 0:
                    if ord(key) == 27:
                        status = 1
                elif status == 1:
                    if ord(key) == 79:
                        status = 2
                    else:
                        status = 0
                elif status == 2:
                    if ord(key) == 80:
                        self._logger.info ("KeyboardThread. Exiting")
                        g_exit_threads = True
                        status = 0
                    else:
                        status = 0

            except:
                time.sleep (0.1);
        
class OutputThread (threading.Thread):

    def __init__ (self, screen, logger, queue):
        threading.Thread.__init__ (self)
        self._screen = screen
        self._logger = logger
        self._queue = queue

    def run (self):
        global g_exit_threads
        self._logger.info ("OutputThread.Run")
        y, x = self._screen.getmaxyx ()
        pos = 2
        while (not g_exit_threads):
            try:
                e = self._queue.get (timeout=0.1)
                self._logger.info ("OutputThread: " + str(ord(e[0])) )
                if e == '\n':
                    pos = 1
                    self._screen.scroll ()
                else:
                    self._logger.info ("Char: " + str(ord(e)))
                    if (ord(e) == 8):
                        if pos > 2:
                            pos -= 1
                        self._screen.addstr (y-1, pos, ' ')
                    elif (ord(e) == 7):
                        curses.flash ()
                    else:
                        self._screen.addstr (y-1, pos, e)
                        pos +=1
            except:
                pass

        self._logger.info ("OutputThread. Exiting")



if __name__ == "__main__":
    logging.basicConfig(format='%(asctime)-6s: %(name)s - %(levelname)s - %(message)s', level=logging.DEBUG,
            filename="console.log")

    queue = Queue.Queue ()

    screen = curses.initscr ()
    curses.noecho ()
    curses.curs_set (0)
    curses.raw ()
    screen.keypad (0)
    screen.clear ()
    screen.nodelay (True)
    y, x = screen.getmaxyx ()
    screen.setscrreg (1, y-1)
    screen.scrollok (True)

    port = serial.Serial (serial_port, 115200, timeout= 0.1);

    threads = []
    thread1 = SerialPortThread (port, logging, queue)
    threads.append (thread1)

    thread2 = KeyboardThread (screen, logging, port)
    threads.append (thread2)

    thread3 = OutputThread (screen, logging, queue)
    threads.append (thread3)


    for t in threads:
        t.setDaemon (True)
        t.start ()

    for t in threads:
        t.join ()

    # Wait for all the thread to finish
    #while threading.active_count () > 0:
    #    time.sleep (0.1)

    curses.endwin ()


No hay comentarios:

Publicar un comentario