test.py 5.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171
  1. import array
  2. import fcntl
  3. import os
  4. import pty
  5. import select
  6. import signal
  7. import sys
  8. import termios
  9. import tty
  10. # The following escape codes are xterm codes.
  11. # See http://rtfm.etla.org/xterm/ctlseq.html for more.
  12. START_ALTERNATE_MODE = set('\x1b[?{0}h'.format(i) for i in ('1049', '47', '1047'))
  13. END_ALTERNATE_MODE = set('\x1b[?{0}l'.format(i) for i in ('1049', '47', '1047'))
  14. ALTERNATE_MODE_FLAGS = tuple(START_ALTERNATE_MODE) + tuple(END_ALTERNATE_MODE)
  15. def findlast(s, substrs):
  16. '''
  17. Finds whichever of the given substrings occurs last in the given string
  18. and returns that substring, or returns None if no such strings
  19. occur.
  20. '''
  21. i = -1
  22. result = None
  23. for substr in substrs:
  24. pos = s.rfind(substr)
  25. if pos > i:
  26. i = pos
  27. result = substr
  28. return result
  29. class Interceptor(object):
  30. '''
  31. This class does the actual work of the pseudo terminal. The spawn()
  32. function is the main entrypoint.
  33. '''
  34. def __init__(self):
  35. self.master_fd = None
  36. self.buffer = ''
  37. def spawn(self, argv=None):
  38. '''
  39. Create a spawned process.
  40. Based on the code for pty.spawn().
  41. '''
  42. assert self.master_fd is None
  43. if not argv:
  44. argv = [os.environ['SHELL']]
  45. pid, master_fd = pty.fork()
  46. self.master_fd = master_fd
  47. if pid == pty.CHILD:
  48. os.execlp(argv[0], *argv)
  49. old_handler = signal.signal(signal.SIGWINCH, self._signal_winch)
  50. try:
  51. mode = tty.tcgetattr(pty.STDIN_FILENO)
  52. tty.setraw(pty.STDIN_FILENO)
  53. restore = 1
  54. except tty.error: # This is the same as termios.error
  55. restore = 0
  56. self._init_fd()
  57. try:
  58. self._copy()
  59. except (IOError, OSError):
  60. if restore:
  61. tty.tcsetattr(pty.STDIN_FILENO, tty.TCSAFLUSH, mode)
  62. os.close(master_fd)
  63. self.master_fd = None
  64. signal.signal(signal.SIGWINCH, old_handler)
  65. def _init_fd(self):
  66. '''
  67. Called once when the pty is first set up.
  68. '''
  69. self._set_pty_size()
  70. def _signal_winch(self, signum, frame):
  71. '''
  72. Signal handler for SIGWINCH - window size has changed.
  73. '''
  74. self._set_pty_size()
  75. def _set_pty_size(self):
  76. '''
  77. Sets the window size of the child pty based on the window size of
  78. our own controlling terminal.
  79. '''
  80. assert self.master_fd is not None
  81. # Get the terminal size of the real terminal, set it on the pseudoterminal.
  82. buf = array.array('h', [0, 0, 0, 0])
  83. fcntl.ioctl(pty.STDOUT_FILENO, termios.TIOCGWINSZ, buf, True)
  84. fcntl.ioctl(self.master_fd, termios.TIOCSWINSZ, buf)
  85. def _copy(self):
  86. '''
  87. Main select loop. Passes all data to self.master_read() or
  88. self.stdin_read().
  89. '''
  90. assert self.master_fd is not None
  91. master_fd = self.master_fd
  92. while 1:
  93. try:
  94. rfds, wfds, xfds = select.select([master_fd, pty.STDIN_FILENO], [], [])
  95. except select.error, e:
  96. if e[0] == 4: # Interrupted system call.
  97. continue
  98. if master_fd in rfds:
  99. data = os.read(self.master_fd, 1024)
  100. self.master_read(data)
  101. if pty.STDIN_FILENO in rfds:
  102. data = os.read(pty.STDIN_FILENO, 1024)
  103. self.stdin_read(data)
  104. def write_stdout(self, data):
  105. '''
  106. Writes to stdout as if the child process had written the data.
  107. '''
  108. os.write(pty.STDOUT_FILENO, data)
  109. def write_master(self, data):
  110. '''
  111. Writes to the child process from its controlling terminal.
  112. '''
  113. master_fd = self.master_fd
  114. assert master_fd is not None
  115. self.buffer = self.buffer + data
  116. if 'echo' in self.buffer:
  117. print self.buffer
  118. while data != '':
  119. n = os.write(master_fd, data)
  120. data = data[n:]
  121. def master_read(self, data):
  122. '''
  123. Called when there is data to be sent from the child process back to
  124. the user.
  125. '''
  126. flag = findlast(data, ALTERNATE_MODE_FLAGS)
  127. if flag is not None:
  128. if flag in START_ALTERNATE_MODE:
  129. # This code is executed when the child process switches the
  130. # terminal into alternate mode. The line below
  131. # assumes that the user has opened vim, and writes a
  132. # message.
  133. self.write_master('IEntering special mode.\x1b')
  134. elif flag in END_ALTERNATE_MODE:
  135. # This code is executed when the child process switches the
  136. # terminal back out of alternate mode. The line below
  137. # assumes that the user has returned to the command
  138. # prompt.
  139. self.write_master('echo "Leaving special mode."\r')
  140. self.write_stdout(data)
  141. def stdin_read(self, data):
  142. '''
  143. Called when there is data to be sent from the user/controlling
  144. terminal down to the child process.
  145. '''
  146. #print i.write_stdout(data)
  147. self.write_master(data)
  148. if __name__ == '__main__':
  149. i = Interceptor()
  150. i.write_stdout('\nThe dream has begun.\n')
  151. i.spawn(sys.argv[1:])
  152. i.write_stdout('\nThe dream is (probably) over.\n')