Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix sse and nginx buffering #5042

Merged
merged 23 commits into from
Nov 21, 2020
Merged
Show file tree
Hide file tree
Changes from 20 commits
Commits
Show all changes
23 commits
Select commit Hold shift + click to select a range
b060886
attempt at fixing sse and nginx buffering
guzzijones Sep 16, 2020
b7c9157
add reasonable request timeout for stream request; add cache-control …
guzzijones Sep 16, 2020
d01ba64
update connect timeout for stream to 90 seconds
guzzijones Sep 16, 2020
ca0d46f
lint fixes
guzzijones Sep 23, 2020
dd63d6d
set new proxy_read_timeout and proxy_connect_timeout for stream endpoint
guzzijones Sep 23, 2020
b60d36d
remove timeout in requests.get
guzzijones Sep 23, 2020
7dd7633
update openapi.yaml
guzzijones Sep 23, 2020
008e5e3
flake fix
guzzijones Sep 23, 2020
c7d7034
remove debugging
guzzijones Sep 23, 2020
c04c8b2
add unit test for stream generator
guzzijones Sep 25, 2020
b973f06
use constant for completed states
guzzijones Sep 28, 2020
c3379f6
Revert "use constant for completed states"
guzzijones Sep 28, 2020
aed4d3c
Revert "Revert "use constant for completed states""
guzzijones Sep 28, 2020
1555abe
fix completed statuses
guzzijones Sep 28, 2020
b174564
end end execution id to tail command
guzzijones Sep 28, 2020
5920690
line length fix
guzzijones Sep 28, 2020
f1f746b
Rename test_generator.py to test_stream_generator.py
m4dcoder Nov 13, 2020
6bc0ac1
Merge branch 'master' into client_nginx
m4dcoder Nov 13, 2020
c26c86d
Rollback some code reordering to fix references
m4dcoder Nov 14, 2020
a7d0d3f
update change log for PR #5042
guzzijones Nov 17, 2020
5c797c8
update contributed by in changelog
guzzijones Nov 17, 2020
e38ddf9
edit change log per request
guzzijones Nov 18, 2020
1ebe2be
Merge branch 'master' into client_nginx
m4dcoder Nov 20, 2020
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions CHANGELOG.rst
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,11 @@ Changed
* Improve the st2-self-check script to echo to stderr and exit if it isn't run with a
ST2_AUTH_TOKEN or ST2_API_KEY environment variable. (improvement) #5068

Fixed
~~~~~~~~~
* Fix nginx buffering long polling stream to client. Instead of waiting for closed connection
wait for final event to be sent to client. fixes issue #4842

3.3.0 - October 06, 2020
------------------------

Expand Down
2 changes: 2 additions & 0 deletions conf/nginx/st2.conf
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,8 @@ server {
proxy_set_header Host $host;
proxy_set_header X-Real-IP $remote_addr;
proxy_set_header X-Forwarded-For $proxy_add_x_forwarded_for;
proxy_read_timeout 200;
proxy_connect_timeout 200;

sendfile on;
tcp_nopush on;
Expand Down
5 changes: 5 additions & 0 deletions st2client/st2client/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,11 @@ def get_client(self, args, debug=False):
# Precedence order: cli arguments > environment variables > rc file variables
cli_options = ['base_url', 'auth_url', 'api_url', 'stream_url', 'api_version', 'cacert']
cli_options = {opt: getattr(args, opt, None) for opt in cli_options}
if cli_options.get("cacert", None) is not None:
if cli_options["cacert"].lower() in ['true', '1', 't', 'y', 'yes']:
cli_options["cacert"] = True
elif cli_options["cacert"].lower() in ['false', '0', 'f', 'no']:
cli_options["cacert"] = False
config_file_options = self._get_config_file_options(args=args)

kwargs = {}
Expand Down
5 changes: 4 additions & 1 deletion st2client/st2client/commands/action.py
Original file line number Diff line number Diff line change
Expand Up @@ -1502,7 +1502,10 @@ def tail_execution(cls, execution_manager, stream_manager, execution, output_typ
workflow_execution_ids.update(children_execution_ids)

events = ['st2.execution__update', 'st2.execution.output__create']
for event in stream_manager.listen(events, **kwargs):
for event in stream_manager.listen(events,
end_execution_id=execution_id,
end_event="st2.execution__update",
**kwargs):
status = event.get('status', None)
is_execution_event = status is not None

Expand Down
3 changes: 2 additions & 1 deletion st2client/st2client/commands/pack.py
Original file line number Diff line number Diff line change
Expand Up @@ -118,7 +118,8 @@ def run_and_print(self, args, **kwargs):

with term.TaskIndicator() as indicator:
events = ['st2.execution__create', 'st2.execution__update']
for event in stream_mgr.listen(events, **kwargs):
for event in stream_mgr.listen(events, end_execution_id=parent_id,
end_event="st2.execution__update", **kwargs):
execution = Execution(**event)

if execution.id == parent_id \
Expand Down
7 changes: 6 additions & 1 deletion st2client/st2client/models/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -638,6 +638,12 @@ def listen(self, events=None, **kwargs):
if 'api_key' in kwargs:
query_params['st2-api-key'] = kwargs.get('api_key')

if 'end_event' in kwargs:
query_params['end_event'] = kwargs.get('end_event')

if 'end_execution_id' in kwargs:
query_params['end_execution_id'] = kwargs.get('end_execution_id')

if events:
query_params['events'] = ','.join(events)

Expand All @@ -655,7 +661,6 @@ def listen(self, events=None, **kwargs):
# can be empty. In this case, rerun the query.
if not message.data:
continue

yield json.loads(message.data)


Expand Down
10 changes: 10 additions & 0 deletions st2common/st2common/openapi.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -4485,6 +4485,16 @@ paths:
description: |
Event stream endpoint.
parameters:
- name: end_execution_id
in: query
description: execution id used to find last event in stream
type: string
required: false
- name: end_event
in: query
description: event to end sse stream
type: string
required: false
- name: events
in: query
description: List of events to listen for. If not provided, it listens to all events.
Expand Down
10 changes: 10 additions & 0 deletions st2common/st2common/openapi.yaml.j2
Original file line number Diff line number Diff line change
Expand Up @@ -4481,6 +4481,16 @@ paths:
description: |
Event stream endpoint.
parameters:
- name: end_execution_id
in: query
description: execution id used to find last event in stream
type: string
required: false
- name: end_event
in: query
description: event to end sse stream
type: string
required: false
- name: events
in: query
description: List of events to listen for. If not provided, it listens to all events.
Expand Down
17 changes: 11 additions & 6 deletions st2common/st2common/stream/listener.py
Original file line number Diff line number Diff line change
Expand Up @@ -78,22 +78,28 @@ def emit(self, event, body):
for queue in self.queues:
queue.put(pack)

def generator(self, events=None, action_refs=None, execution_ids=None):
def generator(self, events=None, action_refs=None, execution_ids=None,
end_event=None, end_statuses=None, end_execution_id=None):
queue = eventlet.Queue()
queue.put('')
self.queues.append(queue)

try:
while not self._stopped:
stop = False
while not self._stopped and not stop:
try:
# TODO: Move to common option
message = queue.get(timeout=cfg.CONF.stream.heartbeat)

if not message:
yield message
continue

event_name, body = message
# check to see if this is the last message to send.
if event_name == end_event:
if body is not None and \
body.status in end_statuses and \
end_execution_id is not None and \
body.id == end_execution_id:
stop = True
# TODO: We now do late filtering, but this could also be performed on the
# message bus level if we modified our exchange layout and utilize routing keys
# Filter on event name
Expand All @@ -109,7 +115,6 @@ def generator(self, events=None, action_refs=None, execution_ids=None):
LOG.debug('Skipping event "%s" with action_ref "%s"' % (event_name,
action_ref))
continue

# Filter on execution id
execution_id = self._get_execution_id_for_body(body=body)
if execution_ids and execution_id not in execution_ids:
Expand Down
74 changes: 74 additions & 0 deletions st2common/tests/unit/test_stream_generator.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
# Copyright 2020 The StackStorm Authors.
# Copyright 2019 Extreme Networks, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

import mock
import unittest2

from st2common.stream import listener


class MockBody(object):

def __init__(self, id):
self.id = id
self.status = "succeeded"


INCLUDE = "test"
END_EVENT = "test_end_event"
END_ID = "test_end_id"
EVENTS = [(INCLUDE, MockBody("notend")), (END_EVENT, MockBody(END_ID))]


class MockQueue():

def __init__(self):
self.items = EVENTS

def get(self, *args, **kwargs):
if len(self.items) > 0:
return self.items.pop(0)
return None

def put(self, event):
self.items.append(event)


class MockListener(listener.BaseListener):

def __init__(self, *args, **kwargs):
super(MockListener, self).__init__(*args, **kwargs)

def get_consumers(self, consumer, channel):
pass


class TestStream(unittest2.TestCase):

@mock.patch('st2common.stream.listener.BaseListener._get_action_ref_for_body')
@mock.patch('eventlet.Queue')
def test_generator(self, mock_queue,
get_action_ref_for_body):
get_action_ref_for_body.return_value = None
mock_queue.return_value = MockQueue()
mock_consumer = MockListener(connection=None)
mock_consumer._stopped = False
app_iter = mock_consumer.generator(events=INCLUDE,
end_event=END_EVENT,
end_statuses=["succeeded"],
end_execution_id=END_ID)
events = EVENTS.append('')
for index, val in enumerate(app_iter):
self.assertEquals(val, events[index])
5 changes: 4 additions & 1 deletion st2stream/st2stream/controllers/v1/executions.py
Original file line number Diff line number Diff line change
Expand Up @@ -135,7 +135,10 @@ def format(gen):

def make_response():
app_iter = itertools.chain(existing_output_iter(), new_output_iter())
res = Response(content_type='text/event-stream', app_iter=app_iter)
res = Response(headerlist=[("X-Accel-Buffering", "no"),
('Cache-Control', 'no-cache'),
("Content-Type", "text/event-stream; charset=UTF-8")],
app_iter=app_iter)
return res

res = make_response()
Expand Down
17 changes: 13 additions & 4 deletions st2stream/st2stream/controllers/v1/stream.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
import six

from st2common import log as logging
from st2common.constants import action as action_constants
from st2common.router import Response
from st2common.util.jsonify import json_encode
from st2common.stream.listener import get_listener
Expand Down Expand Up @@ -54,16 +55,24 @@ def format(gen):


class StreamController(object):
def get_all(self, events=None, action_refs=None, execution_ids=None, requester_user=None):
def get_all(self, end_execution_id=None, end_event=None,
events=None, action_refs=None, execution_ids=None, requester_user=None):
events = events if events else DEFAULT_EVENTS_WHITELIST
action_refs = action_refs if action_refs else None
execution_ids = execution_ids if execution_ids else None

def make_response():
listener = get_listener(name='stream')
app_iter = format(listener.generator(events=events, action_refs=action_refs,
execution_ids=execution_ids))
res = Response(content_type='text/event-stream', app_iter=app_iter)
app_iter = format(listener.generator(events=events,
action_refs=action_refs,
end_event=end_event,
end_statuses=action_constants.LIVEACTION_COMPLETED_STATES,
end_execution_id=end_execution_id,
execution_ids=execution_ids))
res = Response(headerlist=[("X-Accel-Buffering", "no"),
('Cache-Control', 'no-cache'),
("Content-Type", "text/event-stream; charset=UTF-8")],
app_iter=app_iter)
return res

stream = make_response()
Expand Down