Welcome to ShenZhenJia Knowledge Sharing Community for programmer and developer-Open, Learning and Share
menu search
person
Welcome To Ask or Share your Answers For Others

Categories

I have three commands that would otherwise be easily chained together on the command-line like so:

$ echo foo | firstCommand - | secondCommand - | thirdCommand - > finalOutput

In other words, the firstCommand processes foo from standard input and pipes the result to secondCommand, which in turn processes that input and pipes its output to thirdCommand, which does processing and redirects its output to the file finalOutput.

I have been trying to recapitulate this in a Python script, using threading. I'd like to use Python in order to manipulate the output from firstCommand before passing it to secondCommand, and again between secondCommand and thirdCommand.

Here's an excerpt of code that does not seem to work:

first_process = subprocess.Popen(['firstCommand', '-'], stdin=subprocess.PIPE, stdout=subprocess.PIPE)
second_process = subprocess.Popen(['secondCommand', '-'], stdin=subprocess.PIPE, stdout=subprocess.PIPE)
third_process = subprocess.Popen(['thirdCommand', '-'], stdin=subprocess.PIPE, stdout=sys.stdout)

first_thread = threading.Thread(target=consumeOutputFromStdin, args=(sys.stdin, first_process.stdin))
second_thread = threading.Thread(target=consumeOutputFromFirstCommand, args=(first_process.stdout, second_process.stdin))
third_thread = threading.Thread(target=consumeOutputFromSecondCommand, args=(second_process.stdout, third_process.stdin))

first_thread.start()
second_thread.start()
third_thread.start()

first_thread.join()
second_thread.join()
third_thread.join()

first_process.communicate()
second_process.communicate()
third_process.communicate()

# read 1K chunks from standard input
def consumeOutputFromStdin(from_stream, to_stream):
    chunk = from_stream.read(1024)
    while chunk:
        to_stream.write(chunk)
        to_stream.flush()
        chunk = from_stream.read(1024)

def consumeOutputFromFirstCommand(from_stream, to_stream):
    while True:
        unprocessed_line = from_stream.readline()
        if not unprocessed_line:
            break
        processed_line = some_python_function_that_processes_line(unprocessed_line)
        to_stream.write(processed_line)
        to_stream.flush()

def consumeOutputFromSecondCommand(from_stream, to_stream):
    while True:
        unprocessed_line = from_stream.readline()
        if not unprocessed_line:
            break
        processed_line = a_different_python_function_that_processes_line(unprocessed_line)
        to_stream.write(processed_line)
        to_stream.flush()

When I run this, the script hangs:

$ echo foo | ./myConversionScript.py
** hangs here... **

If I hit Ctrl-C to terminate the script, the code is stuck on the line third_thread.join():

  C-c C-c
Traceback (most recent call last):
  File "./myConversionScript.py", line 786, in <module>
    sys.exit(main(*sys.argv))
  File "./myConversionScript.py", line 556, in main
    third_thread.join()
  File "/home/foo/proj/tools/lib/python2.7/threading.py", line 949, in join
    self.__block.wait()
  File "/home/foo/proj/tools/lib/python2.7/threading.py", line 339, in wait
    waiter.acquire()
KeyboardInterrupt

If I don't use a third_process and third_thread, instead only passing data from the output of the first thread to the input of the second thread, there is no hang.

Something about the third thread seems to cause things to break, but I don't know why.

I thought the point of communicate() is that it will handle I/O for the three processes, so I'm not sure why there is an I/O hang.

How do I get three or more commands/processes working together, where one thread consumes the output of another thread/process?

UPDATE

Okay, I made some changes that seem to help, based on some comments here and on other sites. The processes are made to wait() for completion, and within the thread methods, I close() the pipes once the thread has processed all the data that it can. My concern is that memory usage will be very high for large datasets, but at least things are working:

first_process = subprocess.Popen(['firstCommand', '-'], stdin=subprocess.PIPE, stdout=subprocess.PIPE)
second_process = subprocess.Popen(['secondCommand', '-'], stdin=subprocess.PIPE, stdout=subprocess.PIPE)
third_process = subprocess.Popen(['thirdCommand', '-'], stdin=subprocess.PIPE, stdout=sys.stdout)

first_thread = threading.Thread(target=consumeOutputFromStdin, args=(sys.stdin, first_process.stdin))
second_thread = threading.Thread(target=consumeOutputFromFirstCommand, args=(first_process.stdout, second_process.stdin))
third_thread = threading.Thread(target=consumeOutputFromSecondCommand, args=(second_process.stdout, third_process.stdin))

first_thread.start()
second_thread.start()
third_thread.start()

first_thread.join()
second_thread.join()
third_thread.join()

first_process.wait()
second_process.wait()
third_process.wait()

# read 1K chunks from standard input
def consumeOutputFromStdin(from_stream, to_stream):
    chunk = from_stream.read(1024)
    while chunk:
        to_stream.write(chunk)
        to_stream.flush()
        chunk = from_stream.read(1024)

def consumeOutputFromFirstCommand(from_stream, to_stream):
    while True:
        unprocessed_line = from_stream.readline()
        if not unprocessed_line:
            from_stream.close()
            to_stream.close()
            break
        processed_line = some_python_function_that_processes_line(unprocessed_line)
        to_stream.write(processed_line)
        to_stream.flush()

def consumeOutputFromSecondCommand(from_stream, to_stream):
    while True:
        unprocessed_line = from_stream.readline()
        if not unprocessed_line:
            from_stream.close()
            to_stream.close()
            break
        processed_line = a_different_python_function_that_processes_line(unprocessed_line)
        to_stream.write(processed_line)
        to_stream.flush()
See Question&Answers more detail:os

与恶龙缠斗过久,自身亦成为恶龙;凝视深渊过久,深渊将回以凝视…
thumb_up_alt 0 like thumb_down_alt 0 dislike
1.1k views
Welcome To Ask or Share your Answers For Others

1 Answer

To emulate:

echo foo |
firstCommand - | somePythonRoutine - |
secondCommand - | anotherPythonRoutine - |
thirdCommand - > finalOutput

your current approach with threads works:

from subprocess import Popen, PIPE

first = Popen(["firstCommand", "-"], stdin=PIPE, stdout=PIPE, bufsize=1)
second = Popen(["secondCommand", "-"], stdin=PIPE, stdout=PIPE, bufsize=1)
bind(first.stdout, second.stdin, somePythonRoutine)
with open("finalOutput", "wb") as file:
    third = Popen(["thirdCommand", "-"], stdin=PIPE, stdout=file, bufsize=1)
bind(second.stdout, third.stdin, anotherPythonRoutine)

# provide input for the pipeline
first.stdin.write(b"foo")
first.stdin.close()

# wait for it to complete
pipestatus = [p.wait() for p in [first, second, third]]

where each bind() starts a new thread:

from threading import Thread

def bind(input_pipe, output_pipe, line_filter):
    def f():
        try:
            for line in iter(input_pipe.readline, b''):
                line = line_filter(line)
                if line:
                    output_pipe.write(line) # no flush unless newline present
        finally:
            try:
                output_pipe.close()
            finally:
                input_pipe.close()
    t = Thread(target=f)
    t.daemon = True # die if the program exits
    t.start()

and somePythonRoutine, anotherPythonRoutine accept a single line and return it (possibly modified).


与恶龙缠斗过久,自身亦成为恶龙;凝视深渊过久,深渊将回以凝视…
thumb_up_alt 0 like thumb_down_alt 0 dislike
Welcome to ShenZhenJia Knowledge Sharing Community for programmer and developer-Open, Learning and Share
...