From 878bb23366dd099e22d620b40248680c6f74b443 Mon Sep 17 00:00:00 2001 From: "haowen.han@mthreads.com" Date: Mon, 13 May 2024 13:29:24 +0800 Subject: [PATCH] Revert "fix test_communicator_half_async random core;test=develop (#62092)" This reverts commit dba99929f6f1e349d586ffa5455117d2249b7141. --- .../run_server_for_communicator_half_async.py | 38 ------ .../fleet/test_communicator_half_async.py | 118 +++++++++--------- 2 files changed, 60 insertions(+), 96 deletions(-) delete mode 100644 test/collective/fleet/run_server_for_communicator_half_async.py diff --git a/test/collective/fleet/run_server_for_communicator_half_async.py b/test/collective/fleet/run_server_for_communicator_half_async.py deleted file mode 100644 index 14d8fd80331b35..00000000000000 --- a/test/collective/fleet/run_server_for_communicator_half_async.py +++ /dev/null @@ -1,38 +0,0 @@ -# Copyright (c) 2023 PaddlePaddle Authors. All Rights Reserved. -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -import os - -from test_communicator_half_async import TestCommunicatorHalfAsyncEnd2End - -import paddle - -paddle.enable_static() - -pipe_name = os.getenv("PIPE_FILE") - - -class RunServer(TestCommunicatorHalfAsyncEnd2End): - def runTest(self): - pass - - -os.environ["TRAINING_ROLE"] = "PSERVER" -os.environ["http_proxy"] = "" -os.environ["https_proxy"] = "" -half_run_server = RunServer() -with open(pipe_name, 'w') as pipe: - pipe.write('done') - -half_run_server.run_ut() diff --git a/test/collective/fleet/test_communicator_half_async.py b/test/collective/fleet/test_communicator_half_async.py index 687337f25ab2ae..25e5302fb444fd 100644 --- a/test/collective/fleet/test_communicator_half_async.py +++ b/test/collective/fleet/test_communicator_half_async.py @@ -15,7 +15,6 @@ import os import subprocess import sys -import tempfile import unittest import numpy @@ -24,7 +23,6 @@ from paddle import base from paddle.distributed import fleet from paddle.distributed.fleet.base import role_maker -from paddle.distributed.utils.launch_utils import find_free_ports paddle.enable_static() @@ -32,44 +30,25 @@ class TestCommunicatorHalfAsyncEnd2End(unittest.TestCase): def net(self): x = paddle.static.data(name='x', shape=[-1, 13], dtype='float32') - x1 = paddle.static.data( - name='x1', shape=[-1, 1], dtype='int64', lod_level=1 - ) - - emb = paddle.static.nn.embedding( - input=x1, - size=[10000, 10], - param_attr=base.ParamAttr( - name="embedding", - initializer=paddle.nn.initializer.Constant(value=0.01), - ), - is_sparse=True, - ) - - pool = paddle.static.nn.sequence_lod.sequence_pool( - input=emb.squeeze(-2), pool_type="sum" - ) - z = paddle.concat([x, pool], axis=1) - - y_predict = paddle.static.nn.fc(x=z, size=1) + y_predict = paddle.static.nn.fc(x, size=1, activation=None) y = paddle.static.data(name='y', shape=[-1, 1], dtype='float32') + cost = paddle.nn.functional.square_error_cost(input=y_predict, label=y) avg_cost = paddle.mean(cost) - return avg_cost, x, x1, y + return avg_cost, x, y def fake_reader(self): def reader(): for i in range(10000): x = numpy.random.random((1, 13)).astype('float32') - z = numpy.random.randint(0, 9999, (1, 1)).astype('int64') y = numpy.random.randint(0, 2, (1, 1)).astype('int64') - yield x, z, y + yield x, y return reader def run_pserver(self, role, strategy): fleet.init(role) - avg_cost, x, z, y = self.net() + avg_cost, x, y = self.net() optimizer = paddle.optimizer.SGD(0.01) optimizer = fleet.distributed_optimizer(optimizer, strategy) optimizer.minimize(avg_cost) @@ -82,20 +61,20 @@ def run_trainer(self, role, strategy): exe = base.Executor(place) fleet.init(role) - avg_cost, x, z, y = self.net() + avg_cost, x, y = self.net() optimizer = paddle.optimizer.SGD(0.01) optimizer = fleet.distributed_optimizer(optimizer, strategy) optimizer.minimize(avg_cost) - exe.run(base.default_startup_program()) + exe.run(paddle.static.default_startup_program()) fleet.init_worker() train_reader = paddle.batch(self.fake_reader(), batch_size=24) - feeder = base.DataFeeder(place=place, feed_list=[x, z, y]) + feeder = base.DataFeeder(place=place, feed_list=[x, y]) for batch_id, data in enumerate(train_reader()): exe.run( - base.default_main_program(), + paddle.static.default_main_program(), feed=feeder.feed(data), fetch_list=[], ) @@ -103,58 +82,81 @@ def run_trainer(self, role, strategy): fleet.stop_worker() def run_ut(self): - training_role = os.getenv("TRAINING_ROLE", "TRAINER") - - os.environ["PADDLE_PSERVER_NUMS"] = "1" - os.environ["PADDLE_TRAINERS_NUM"] = "1" - os.environ["PADDLE_TRAINER_ID"] = "0" - os.environ["PADDLE_TRAINERS_NUM"] = "1" - os.environ["POD_IP"] = "127.0.0.1" - - role = role_maker.PaddleCloudRoleMaker() - strategy = paddle.distributed.fleet.DistributedStrategy() strategy.a_sync = True + training_role = os.getenv("TRAINING_ROLE", "TRAINER") + + role = role_maker.UserDefinedRoleMaker( + current_id=0, + role=role_maker.Role.WORKER + if training_role == "TRAINER" + else role_maker.Role.SERVER, + worker_num=1, + server_endpoints=["127.0.0.1:6002"], + ) + if training_role == "TRAINER": self.run_trainer(role, strategy) else: self.run_pserver(role, strategy) def test_communicator(self): - temp_dir = tempfile.TemporaryDirectory() - pipe_name = os.path.join(temp_dir.name, 'mypipe') - try: - os.mkfifo(pipe_name) - except OSError as oe: - print(f"Failed to create pipe: {oe}") + run_server_cmd = """ - port = find_free_ports(1).pop() +import sys +import os - os.environ["TRAINING_ROLE"] = "PSERVER" - os.environ["PADDLE_PORT"] = str(port) - os.environ["PADDLE_PSERVERS_IP_PORT_LIST"] = f"127.0.0.1:{port}" - os.environ["PIPE_FILE"] = pipe_name +import time +import threading +import subprocess +import unittest +import numpy + +from test_communicator_half_async import TestCommunicatorHalfAsyncEnd2End + +import paddle +import paddle.base as base +import paddle.distributed.fleet as fleet +import paddle.distributed.fleet.base.role_maker as role_maker + +paddle.enable_static() + +class RunServer(TestCommunicatorHalfAsyncEnd2End): + def runTest(self): + pass +os.environ["http_proxy"] = "" +os.environ["https_proxy"] = "" +os.environ["TRAINING_ROLE"] = "PSERVER" +half_run_server = RunServer() +half_run_server.run_ut() +""" + + server_file = "run_server_for_communicator_haflaysnc.py" + with open(server_file, "w") as wb: + wb.write(run_server_cmd) + os.environ["TRAINING_ROLE"] = "PSERVER" _python = sys.executable - server_file = "run_server_for_communicator_half_async.py" - ps_cmd = f"{_python} {server_file}" + ps_cmd = f"{_python} {server_file}" ps_proc = subprocess.Popen( ps_cmd.strip().split(" "), stdout=subprocess.PIPE, stderr=subprocess.PIPE, ) - with open(pipe_name, 'r') as pipe: - start_command = pipe.read() - + os.environ["http_proxy"] = "" + os.environ["https_proxy"] = "" os.environ["TRAINING_ROLE"] = "TRAINER" + os.environ["FLAGS_communicator_send_queue_size"] = "1" + os.environ["FLAGS_communicator_max_merge_var_num"] = "1" self.run_ut() ps_proc.kill() - ps_proc.wait() - outs, errs = ps_proc.communicate() + + if os.path.exists(server_file): + os.remove(server_file) if __name__ == '__main__':