Skip to content

Commit

Permalink
lint fixes
Browse files Browse the repository at this point in the history
  • Loading branch information
guzzijones committed Sep 23, 2020
1 parent d01ba64 commit ca0d46f
Show file tree
Hide file tree
Showing 7 changed files with 46 additions and 14 deletions.
3 changes: 2 additions & 1 deletion st2client/st2client/commands/pack.py
Original file line number Diff line number Diff line change
Expand Up @@ -118,7 +118,8 @@ def run_and_print(self, args, **kwargs):

with term.TaskIndicator() as indicator:
events = ['st2.execution__create', 'st2.execution__update']
for event in stream_mgr.listen(events, **kwargs):
for event in stream_mgr.listen(events, end_execution_id=parent_id,
end_event="st2.execution__update", **kwargs):
execution = Execution(**event)

if execution.id == parent_id \
Expand Down
7 changes: 6 additions & 1 deletion st2client/st2client/models/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -638,6 +638,12 @@ def listen(self, events=None, **kwargs):
if 'api_key' in kwargs:
query_params['st2-api-key'] = kwargs.get('api_key')

if 'end_event' in kwargs:
query_params['end_event'] = kwargs.get('end_event')

if 'end_execution_id' in kwargs:
query_params['end_execution_id'] = kwargs.get('end_execution_id')

if events:
query_params['events'] = ','.join(events)

Expand All @@ -655,7 +661,6 @@ def listen(self, events=None, **kwargs):
# can be empty. In this case, rerun the query.
if not message.data:
continue

yield json.loads(message.data)


Expand Down
10 changes: 10 additions & 0 deletions st2common/st2common/openapi.yaml.j2
Original file line number Diff line number Diff line change
Expand Up @@ -4481,6 +4481,16 @@ paths:
description: |
Event stream endpoint.
parameters:
- name: end_execution_id
in: query
description: execution id used to find last event in stream
type: string
required: false
- name: end_event
in: query
description: event to end sse stream
type: string
required: false
- name: events
in: query
description: List of events to listen for. If not provided, it listens to all events.
Expand Down
18 changes: 12 additions & 6 deletions st2common/st2common/stream/listener.py
Original file line number Diff line number Diff line change
Expand Up @@ -78,22 +78,29 @@ def emit(self, event, body):
for queue in self.queues:
queue.put(pack)

def generator(self, events=None, action_refs=None, execution_ids=None):
def generator(self, events=None, action_refs=None, execution_ids=None,
end_event=None, end_statuses=None, end_execution_id=None):
queue = eventlet.Queue()
queue.put('')
self.queues.append(queue)

try:
while not self._stopped:
stop = False
while not self._stopped and not stop:
try:
# TODO: Move to common option
message = queue.get(timeout=cfg.CONF.stream.heartbeat)

if not message:
LOG.debug("not_message_generator")
yield message
continue

event_name, body = message
# check to see if this is the last message to send.
if event_name == end_event:
if body is not None and \
body.status in end_statuses and \
end_execution_id is not None and \
body.id == end_execution_id:
stop = True
# TODO: We now do late filtering, but this could also be performed on the
# message bus level if we modified our exchange layout and utilize routing keys
# Filter on event name
Expand All @@ -109,7 +116,6 @@ def generator(self, events=None, action_refs=None, execution_ids=None):
LOG.debug('Skipping event "%s" with action_ref "%s"' % (event_name,
action_ref))
continue

# Filter on execution id
execution_id = self._get_execution_id_for_body(body=body)
if execution_ids and execution_id not in execution_ids:
Expand Down
6 changes: 3 additions & 3 deletions st2stream/st2stream/controllers/v1/executions.py
Original file line number Diff line number Diff line change
Expand Up @@ -135,10 +135,10 @@ def format(gen):

def make_response():
app_iter = itertools.chain(existing_output_iter(), new_output_iter())
res = Response(headerlist=[("X-Accel-Buffering","no"),
res = Response(headerlist=[("X-Accel-Buffering", "no"),
('Cache-Control', 'no-cache'),
("Content-Type","text/event-stream; charset=UTF-8")],
app_iter=app_iter)
("Content-Type", "text/event-stream; charset=UTF-8")],
app_iter=app_iter)
return res

res = make_response()
Expand Down
15 changes: 12 additions & 3 deletions st2stream/st2stream/controllers/v1/stream.py
Original file line number Diff line number Diff line change
Expand Up @@ -54,16 +54,25 @@ def format(gen):


class StreamController(object):
def get_all(self, events=None, action_refs=None, execution_ids=None, requester_user=None):
def get_all(self, end_execution_id=None, end_event=None,
events=None, action_refs=None, execution_ids=None, requester_user=None):
events = events if events else DEFAULT_EVENTS_WHITELIST
action_refs = action_refs if action_refs else None
execution_ids = execution_ids if execution_ids else None
end_statuses = ["failed", "succeeded"]

def make_response():
listener = get_listener(name='stream')
app_iter = format(listener.generator(events=events, action_refs=action_refs,
app_iter = format(listener.generator(events=events,
action_refs=action_refs,
end_event=end_event,
end_statuses=end_statuses,
end_execution_id=end_execution_id,
execution_ids=execution_ids))
res = Response(content_type='text/event-stream', app_iter=app_iter)
res = Response(headerlist=[("X-Accel-Buffering", "no"),
('Cache-Control', 'no-cache'),
("Content-Type", "text/event-stream; charset=UTF-8")],
app_iter=app_iter)
return res

stream = make_response()
Expand Down
1 change: 1 addition & 0 deletions test-requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ coverage==4.5.2
pep8==1.7.1
st2flake8==0.1.0
astroid==1.6.5
isort==4.3.21
pylint==1.9.4
pylint-plugin-utils>=0.4
bandit==1.5.1
Expand Down

0 comments on commit ca0d46f

Please sign in to comment.