Commit 1790cd09 by Matthias Putz

Portable: process output reading, using sockets on Windows instead of fcntl

parent deac6e35
...@@ -14,11 +14,12 @@ ...@@ -14,11 +14,12 @@
# limitations under the License. # limitations under the License.
from __future__ import print_function from __future__ import print_function
import fcntl #import fcntl
import os import os
import select import select
import sys import sys
import subprocess import subprocess
import portable
import tempfile import tempfile
from signal import SIGTERM from signal import SIGTERM
from error import GitError from error import GitError
...@@ -78,15 +79,15 @@ def terminate_ssh_clients(): ...@@ -78,15 +79,15 @@ def terminate_ssh_clients():
_git_version = None _git_version = None
class _sfd(object): # class _sfd(object):
"""select file descriptor class""" # """select file descriptor class"""
def __init__(self, fd, dest, std_name): # def __init__(self, fd, dest, std_name):
assert std_name in ('stdout', 'stderr') # assert std_name in ('stdout', 'stderr')
self.fd = fd # self.fd = fd
self.dest = dest # self.dest = dest
self.std_name = std_name # self.std_name = std_name
def fileno(self): # def fileno(self):
return self.fd.fileno() # return self.fd.fileno()
class _GitCall(object): class _GitCall(object):
def version(self): def version(self):
...@@ -250,19 +251,22 @@ class GitCommand(object): ...@@ -250,19 +251,22 @@ class GitCommand(object):
def _CaptureOutput(self): def _CaptureOutput(self):
p = self.process p = self.process
s_in = [_sfd(p.stdout, sys.stdout, 'stdout'), # s_in = [_sfd(p.stdout, sys.stdout, 'stdout'),
_sfd(p.stderr, sys.stderr, 'stderr')] # _sfd(p.stderr, sys.stderr, 'stderr')]
s_in = [portable.input_reader(p.stdout, sys.stdout, 'stdout'),
portable.input_reader(p.stderr, sys.stderr, 'stderr')]
self.stdout = '' self.stdout = ''
self.stderr = '' self.stderr = ''
for s in s_in: # for s in s_in:
flags = fcntl.fcntl(s.fd, fcntl.F_GETFL) # flags = fcntl.fcntl(s.fd, fcntl.F_GETFL)
fcntl.fcntl(s.fd, fcntl.F_SETFL, flags | os.O_NONBLOCK) # fcntl.fcntl(s.fd, fcntl.F_SETFL, flags | os.O_NONBLOCK)
while s_in: while s_in:
in_ready, _, _ = select.select(s_in, [], []) in_ready, _, _ = select.select(s_in, [], [])
for s in in_ready: for s in in_ready:
buf = s.fd.read(4096) # buf = s.fd.read(4096)
buf = s.read(4096)
if not buf: if not buf:
s_in.remove(s) s_in.remove(s)
continue continue
......
import os import os
import platform import platform
import socket
import threading
from trace import Trace
def isUnix(): def isUnix():
return platform.system() != "Windows" return platform.system() != "Windows"
if isUnix():
import fcntl
def to_windows_path(path): def to_windows_path(path):
return path.replace('/', '\\') return path.replace('/', '\\')
def input_reader(src, dest, std_name):
if isUnix():
return file_reader(src, dest, std_name)
else:
return socket_reader(src, dest, std_name)
class file_reader(object):
"""select file descriptor class"""
def __init__(self, fd, dest, std_name):
assert std_name in ('stdout', 'stderr')
self.fd = fd
self.dest = dest
self.std_name = std_name
self.setup_fd()
def setup_fd(self):
flags = fcntl.fcntl(self.fd, fcntl.F_GETFL)
fcntl.fcntl(self.fd, fcntl.F_SETFL, flags | os.O_NONBLOCK)
def fileno(self):
return self.fd.fileno()
def read(self, bufsize):
return self.fd.read(bufsize)
class socket_reader():
"""select socket with file descriptor class"""
def __init__(self, src, dest, std_name=''):
self.src = src
self.dest = dest
self.std_name = std_name
self.completed = False
self.host = "localhost"
self.server_socket = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
self.server_socket.bind((self.host, 0))
self.server_socket.setblocking(0)
self.port = self.server_socket.getsockname()[1]
address = (self.host, self.port)
self.client_socket = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
self.client_socket.connect(address)
t = threading.Thread(target=self.send_msg, args=(self.src, self.client_socket, address))
t.start()
def send_msg(self, src, dest, address):
while True:
data = src.read(4096)
if data:
dest.sendto(data, address)
else:
break
dest.sendto("", address)
def read(self, bufsize):
try:
return self.server_socket.recv(bufsize)
except Exception as e:
Trace("failed to read from server socket: " + e.strerror)
self.client_socket.close()
self.server_socket.close()
def fileno(self):
return self.server_socket.fileno()
...@@ -15,7 +15,7 @@ ...@@ -15,7 +15,7 @@
from __future__ import print_function from __future__ import print_function
import errno import errno
import fcntl #import fcntl
import multiprocessing import multiprocessing
import re import re
import os import os
...@@ -23,6 +23,7 @@ import select ...@@ -23,6 +23,7 @@ import select
import signal import signal
import sys import sys
import subprocess import subprocess
import portable
from color import Coloring from color import Coloring
from command import Command, MirrorSafeCommand from command import Command, MirrorSafeCommand
...@@ -337,28 +338,31 @@ def DoWork(project, mirror, opt, cmd, shell, cnt, config): ...@@ -337,28 +338,31 @@ def DoWork(project, mirror, opt, cmd, shell, cnt, config):
if opt.project_header: if opt.project_header:
out = ForallColoring(config) out = ForallColoring(config)
out.redirect(sys.stdout) out.redirect(sys.stdout)
class sfd(object): # class sfd(object):
def __init__(self, fd, dest): # def __init__(self, fd, dest):
self.fd = fd # self.fd = fd
self.dest = dest # self.dest = dest
def fileno(self): # def fileno(self):
return self.fd.fileno() # return self.fd.fileno()
empty = True empty = True
errbuf = '' errbuf = ''
p.stdin.close() p.stdin.close()
s_in = [sfd(p.stdout, sys.stdout), # s_in = [sfd(p.stdout, sys.stdout),
sfd(p.stderr, sys.stderr)] # sfd(p.stderr, sys.stderr)]
s_in = [portable.input_reader(p.stdout, sys.stdout, 'stdout'),
portable.input_reader(p.stderr, sys.stderr, 'stderr')]
for s in s_in: # for s in s_in:
flags = fcntl.fcntl(s.fd, fcntl.F_GETFL) # flags = fcntl.fcntl(s.fd, fcntl.F_GETFL)
fcntl.fcntl(s.fd, fcntl.F_SETFL, flags | os.O_NONBLOCK) # fcntl.fcntl(s.fd, fcntl.F_SETFL, flags | os.O_NONBLOCK)
while s_in: while s_in:
in_ready, _out_ready, _err_ready = select.select(s_in, [], []) in_ready, _out_ready, _err_ready = select.select(s_in, [], [])
for s in in_ready: for s in in_ready:
buf = s.fd.read(4096) # buf = s.fd.read(4096)
buf = s.read(4096)
if not buf: if not buf:
s.fd.close() s.fd.close()
s_in.remove(s) s_in.remove(s)
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment