test.py 5.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167
  1. import array
  2. import fcntl
  3. import os
  4. import pty
  5. import select
  6. import signal
  7. import sys
  8. import termios
  9. import tty
  10. # The following escape codes are xterm codes.
  11. # See http://rtfm.etla.org/xterm/ctlseq.html for more.
  12. START_ALTERNATE_MODE = set('\x1b[?{0}h'.format(i) for i in ('1049', '47', '1047'))
  13. END_ALTERNATE_MODE = set('\x1b[?{0}l'.format(i) for i in ('1049', '47', '1047'))
  14. ALTERNATE_MODE_FLAGS = tuple(START_ALTERNATE_MODE) + tuple(END_ALTERNATE_MODE)
  15. def findlast(s, substrs):
  16. '''
  17. Finds whichever of the given substrings occurs last in the given string
  18. and returns that substring, or returns None if no such strings
  19. occur.
  20. '''
  21. i = -1
  22. result = None
  23. for substr in substrs:
  24. pos = s.rfind(substr)
  25. if pos > i:
  26. i = pos
  27. result = substr
  28. return result
  29. class Interceptor(object):
  30. '''
  31. This class does the actual work of the pseudo terminal. The spawn()
  32. function is the main entrypoint.
  33. '''
  34. def __init__(self):
  35. self.master_fd = None
  36. def spawn(self, argv=None):
  37. '''
  38. Create a spawned process.
  39. Based on the code for pty.spawn().
  40. '''
  41. assert self.master_fd is None
  42. if not argv:
  43. argv = [os.environ['SHELL']]
  44. pid, master_fd = pty.fork()
  45. self.master_fd = master_fd
  46. if pid == pty.CHILD:
  47. os.execlp(argv[0], *argv)
  48. old_handler = signal.signal(signal.SIGWINCH, self._signal_winch)
  49. try:
  50. mode = tty.tcgetattr(pty.STDIN_FILENO)
  51. tty.setraw(pty.STDIN_FILENO)
  52. restore = 1
  53. except tty.error: # This is the same as termios.error
  54. restore = 0
  55. self._init_fd()
  56. try:
  57. self._copy()
  58. except (IOError, OSError):
  59. if restore:
  60. tty.tcsetattr(pty.STDIN_FILENO, tty.TCSAFLUSH, mode)
  61. os.close(master_fd)
  62. self.master_fd = None
  63. signal.signal(signal.SIGWINCH, old_handler)
  64. def _init_fd(self):
  65. '''
  66. Called once when the pty is first set up.
  67. '''
  68. self._set_pty_size()
  69. def _signal_winch(self, signum, frame):
  70. '''
  71. Signal handler for SIGWINCH - window size has changed.
  72. '''
  73. self._set_pty_size()
  74. def _set_pty_size(self):
  75. '''
  76. Sets the window size of the child pty based on the window size of
  77. our own controlling terminal.
  78. '''
  79. assert self.master_fd is not None
  80. # Get the terminal size of the real terminal, set it on the pseudoterminal.
  81. buf = array.array('h', [0, 0, 0, 0])
  82. fcntl.ioctl(pty.STDOUT_FILENO, termios.TIOCGWINSZ, buf, True)
  83. fcntl.ioctl(self.master_fd, termios.TIOCSWINSZ, buf)
  84. def _copy(self):
  85. '''
  86. Main select loop. Passes all data to self.master_read() or
  87. self.stdin_read().
  88. '''
  89. assert self.master_fd is not None
  90. master_fd = self.master_fd
  91. while 1:
  92. try:
  93. rfds, wfds, xfds = select.select([master_fd, pty.STDIN_FILENO], [], [])
  94. except select.error, e:
  95. if e[0] == 4: # Interrupted system call.
  96. continue
  97. if master_fd in rfds:
  98. data = os.read(self.master_fd, 1024)
  99. self.master_read(data)
  100. if pty.STDIN_FILENO in rfds:
  101. data = os.read(pty.STDIN_FILENO, 1024)
  102. self.stdin_read(data)
  103. def write_stdout(self, data):
  104. '''
  105. Writes to stdout as if the child process had written the data.
  106. '''
  107. os.write(pty.STDOUT_FILENO, data)
  108. def write_master(self, data):
  109. '''
  110. Writes to the child process from its controlling terminal.
  111. '''
  112. master_fd = self.master_fd
  113. assert master_fd is not None
  114. while data != '':
  115. n = os.write(master_fd, data)
  116. data = data[n:]
  117. def master_read(self, data):
  118. '''
  119. Called when there is data to be sent from the child process back to
  120. the user.
  121. '''
  122. flag = findlast(data, ALTERNATE_MODE_FLAGS)
  123. if flag is not None:
  124. if flag in START_ALTERNATE_MODE:
  125. # This code is executed when the child process switches the
  126. # terminal into alternate mode. The line below
  127. # assumes that the user has opened vim, and writes a
  128. # message.
  129. self.write_master('IEntering special mode.\x1b')
  130. elif flag in END_ALTERNATE_MODE:
  131. # This code is executed when the child process switches the
  132. # terminal back out of alternate mode. The line below
  133. # assumes that the user has returned to the command
  134. # prompt.
  135. self.write_master('echo "Leaving special mode."\r')
  136. self.write_stdout(data)
  137. def stdin_read(self, data):
  138. '''
  139. Called when there is data to be sent from the user/controlling
  140. terminal down to the child process.
  141. '''
  142. print data + '_'
  143. self.write_master(data)
  144. if __name__ == '__main__':
  145. i = Interceptor()
  146. i.write_stdout('\nThe dream has begun.\n')
  147. i.spawn(sys.argv[1:])
  148. i.write_stdout('\nThe dream is (probably) over.\n')