6
6
"""
7
7
8
8
import os , requests , json , sys , jsonrpclib
9
- from subprocess import Popen
9
+ from subprocess import Popen , PIPE
10
10
from hashlib import sha1
11
11
12
12
class CoreNLPLoadBalancer :
13
13
def __init__ (self , options ):
14
14
self .tempdir = "/tmp/"
15
15
self .ports = options .ports .split (',' )
16
16
self .host = options .host
17
- self .serverPool = []
17
+ self .serverPool = {}
18
18
self .processPool = {}
19
- self .args = ["python" , os .getcwd () + "/corenlp.py" , \
19
+ self .args = ["python" , os .getcwd () + "/corenlp/corenlp .py" , \
20
20
'--host=%s' % (options .host ), \
21
21
'--properties=%s' % (options .properties ), \
22
22
'--corenlp=%s' % (options .corenlp )]
23
23
if not options .verbose :
24
- args += ['--quiet' ]
24
+ self . args += ['--quiet' ]
25
25
self .portCounter = 0
26
-
26
+ self . startup ()
27
27
28
28
def startup (self ):
29
29
""" Open a traditional server subprocess in a new port """
30
30
for port in self .ports :
31
- self .serverPool [port ] = Popen (args + ['--port=%s' % str (port )])
31
+ self .serverPool [port ] = Popen (self . args + ['--port=%s' % str (port )])
32
32
33
33
def shutdown (self ):
34
34
for port in self .ports :
@@ -38,7 +38,7 @@ def sendThreadedRequest(self, key, port):
38
38
""" Create a process that communicates with the server in a thread to avoid blocking """
39
39
host = 'http://%s:%s' % (self .host .replace ('http://' , '' ), port )
40
40
filename = self .tempdir + key + ".tmp"
41
- self .processPool [key ] = [ Popen (['python' , ' subserver.py' , host , filename ], stdout = PIPE )]
41
+ self .processPool [key ] = Popen (['python' , os . getcwd () + '/corenlp/ subserver.py' , host , filename ], stdout = PIPE )
42
42
43
43
def send (self , text ):
44
44
"""
@@ -47,28 +47,60 @@ def send(self, text):
47
47
can correlate requests to responses.
48
48
"""
49
49
currentPort = self .ports [self .portCounter ]
50
- key = sha1 (text )
50
+ key = sha1 (text ). hexdigest ()
51
51
filename = self .tempdir + key + ".tmp"
52
52
f = open (filename , 'w' )
53
53
f .write (text )
54
54
f .close ()
55
55
self .sendThreadedRequest (key , currentPort )
56
56
return {'status' :'OK' , 'key' :key }
57
57
58
- def receive (self , blocking = False ):
59
- """ Returns all completed parses. Set blocking to True on your last iteration! """
60
- go = True
61
- response = []
62
- while go :
58
+ def getCompleted (self ):
59
+ """ Returns all completed parses. Set blocking to True on your last iteration to get all data """
60
+ docResponse = {}
61
+ response = { 'status' : 'OK' }
62
+ try :
63
63
for key in self .processPool .keys ():
64
+ print key
64
65
process = self .processPool [key ]
66
+ print process
65
67
if process .poll () != None :
66
- (out , error ) = process .communicate ()
67
- if out :
68
- try :
69
- response [key ] = [json .loads (out )]
70
- except :
71
- pass
68
+ docResponse [key ] = self .getForFinishedProcess (process )
72
69
del self .processPool [key ]
73
- go = blocking and len (self .processPool ) > 0
70
+ except :
71
+ response ['status' ] = 'ERROR'
72
+ response ['error' ] = sys .exc_info ()[1 ]
73
+ response ['parses' ] = docResponse
74
+ return response
75
+
76
+ def getAll (self ):
77
+ """ Blocking counterpart to getCompleted. Wait for all currently open processes to complete and send response. """
78
+ response = {}
79
+ for key in self .processPool .keys ():
80
+ response [key ] = self .getForKey (key )
81
+ return {'status' :'OK' , 'parses' :response }
82
+
83
+ def getForFinishedProcess (self , process ):
84
+ """ Returns a dictionary with json string or empty dictionary """
85
+ response = {}
86
+ print 'communicating'
87
+ (out , error ) = process .communicate ()
88
+ if out :
89
+ try :
90
+ response = json .loads (out )
91
+ except :
92
+ print sys .exc_info ()
93
+ pass
94
+ return response
95
+
96
+ def getForKey (self , key ):
97
+ """ Retrieves a response for a given key. This is blocking. """
98
+ response = {}
99
+ if key in self .processPool .keys ():
100
+ process = self .processPool [key ]
101
+ if process .poll == None :
102
+ process .wait ()
103
+ response [key ] = self .getForFinishedProcess (process )
104
+ else :
105
+ response [key ] = {}
74
106
return response
0 commit comments