Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Improvement] Optimize data flushing and memory usage for huge partitions to improve stability #378

Closed
8 tasks done
zuston opened this issue Dec 1, 2022 · 30 comments
Closed
8 tasks done

Comments

@zuston
Copy link
Member

zuston commented Dec 1, 2022

Code of Conduct

Search before asking

  • I have searched in the issues and found no similar issues.

What would you like to be improved?

Background

In our internal uniffle cluster, when using the storage type of MEMORY_LOCALFILE, the partial single huge partitions make disk reach high watermark, which make whole shuffle-server out-of-service due to full memory occupation.

Possible solutions

  1. Using the storage type of MEMORY_LOCALFILE_HDFS with the fallback strategy of LocalStorageManagerFallbackStrategy. [ISSUE-163][FEATURE] Write to hdfs when local disk can't be write #235

After testing this mechanism, I found some bugs which make fallback invalid.

  1. Introduce the new strategy of dynamic disk selection [Improvement] Optimize local disk selection strategy #373
  2. Introduce partition size based strategy to flush single huge partition data to HDFS

Currently, I prefer 3th solution. In which, we could use cold storage(HDFS) when cumulative size of a particular partition is above a specific threshold(like 50g?). Actually, if exclude the partial huge partitions, the disk free ratio of whole shuffle-server is 20%-30%.

Reasons

  1. We could avoid hotspot of a single shuffle server, which could use HDFS to distribute pressure.
  2. Especially useful when disk spaces of shuffle servers are limited, which cannot hold a super large shuffle partition.
  3. Compared with 1th solution, it is more restrained for HDFS use and will not cause greater pressure. (Actually we don't want to accept much shuffle-data to HDFS with 3 replicas)
  4. Compared with 2th solution, it is more effective and have better performance.
  5. Only downgrade storage of partial huge partition jobs to HDFS, which is effective, especially when the HDFS cluster performance is not good.

Final solution

The solution of handling huge partitions is to make it flush to HDFS directly and limit memory usage, all subtasks are as follows.

  1. Speed up flushing partition data to HDFS.
  2. Introduce the memory usage limitation for huge partitions

How should we improve?

No response

Are you willing to submit PR?

  • Yes I am willing to submit a PR!
@zuston
Copy link
Member Author

zuston commented Dec 1, 2022

After discussing with my mentor, I raise a new proposal. Feel free to discuss more. @xianjingfeng @jerqi

@jerqi
Copy link
Contributor

jerqi commented Dec 1, 2022

After discussing with my mentor, I raise a new proposal. Feel free to discuss more. @xianjingfeng @jerqi

What's the advantage of this strategy compared with current strategy?

@zuston
Copy link
Member Author

zuston commented Dec 1, 2022

After discussing with my mentor, I raise a new proposal. Feel free to discuss more. @xianjingfeng @jerqi

What's the advantage of this strategy compared with current strategy?

Reasons have been attached.

@jerqi
Copy link
Contributor

jerqi commented Dec 1, 2022

My concern about this strategy is that we need to maintain more partition meta. Will it give more gc pressure?

@jerqi
Copy link
Contributor

jerqi commented Dec 1, 2022

After discussing with my mentor, I raise a new proposal. Feel free to discuss more. @xianjingfeng @jerqi

What's the advantage of this strategy compared with current strategy?

Reasons have been attached.

Current strategy have picked the most partition to HDFS. You can see our implement.

@zuston
Copy link
Member Author

zuston commented Dec 1, 2022

Yes, I have tested this implementation. But it looks invalid when fallback to HDFS. But this is not a big problem.

The key of proposal is to avoid too much data into HDFS, we don't want to accept shuffle data 3 replica in HDFS. And sometimes, the performance is not good.

@jerqi
Copy link
Contributor

jerqi commented Dec 1, 2022

Yes, I have tested this implementation. But it looks invalid when fallback to HDFS. But this is not a big problem.

The key of proposal is to avoid too much data into HDFS, we don't want to accept shuffle data 3 replica in HDFS. And sometimes, the performance is not good.

You can set the data replica with hadoop configuration.

@zuston
Copy link
Member Author

zuston commented Dec 1, 2022

This will bring a great burden to the HDFS ops

@jerqi
Copy link
Contributor

jerqi commented Dec 1, 2022

This will bring a great burden to the HDFS ops

You misunderstand me. The replica num is a client configuration. We just modify the Spark's client. You can see https://github.com/apache/incubator-uniffle/blob/884921b6d7c5452f6f3548ebc87840071c6be81f/proto/src/main/proto/Rss.proto#L422 for details.

@zuston
Copy link
Member Author

zuston commented Dec 1, 2022

Let me take a look.

@jerqi
Copy link
Contributor

jerqi commented Dec 1, 2022

The data which was written to HDFS must be higher than a data size. If we write small data size, it will influence the performance of HDFS.

@zuston
Copy link
Member Author

zuston commented Dec 1, 2022

The data which was written to HDFS must be higher than a data size. If we write small data size, it will influence the performance of HDFS.

I know this, so I hope to use hdfs as little as possible, only open for those huge partition data.

@jerqi
Copy link
Contributor

jerqi commented Dec 1, 2022

If you write the data which partition size > 50G && event > 64MB, I feel that it's ok.

@xianjingfeng
Copy link
Member

In our production environment the disk is enough in most of the time. And it is difficult to set the threshold of partition size, and this will result in reduced disk usage. So we prefer to write hdfs when the disk is full.

@ChengbingLiu
Copy link

ChengbingLiu commented Dec 8, 2022

@jerqi Here is our scenario:

We co-located Uniffle shuffle servers with our YARN NodeManagers. Therefore the shuffle servers have limited memory (10-20 GB) and limited disk space (1-5 TB). However, there can be as many as 1k shuffle tasks on a shuffle server at the same time, with each task takes up only 1~2MB memory on the average.

When a super large shuffle partition comes in, one of the disks quickly reaches its high watermark and memory starts to flush directly to HDFS. Since each task has only a small size (1-2MB) in the memory, it could flush(append) to HDFS with only 1~2MB at a time even if the configured flush threshold rss.server.flush.cold.storage.threshold.size is 64MB.

IMO, the flush threshold works only with two preconditions:

  1. The shuffle server has enough memory compared to concurrent shuffle tasks (otherwise the threshold may not be reached)
  2. The shuffle server has enough disk space (otherwise small partitions may fallback to cold storage)

If either condition is not met, it's better to have another machanism to better utilize the limited memory/disk resources, instead of wasting them on a uncommonly large partition.

Please correct me if I'm wrong.

@jerqi
Copy link
Contributor

jerqi commented Dec 8, 2022

@jerqi Here is our scenario:

We co-located Uniffle shuffle servers with our YARN NodeManagers. Therefore the shuffle servers have limited memory (10-20 GB) and limited disk space (1-5 TB). However, there can be as many as 1k shuffle tasks on a shuffle server at the same time, with each task takes up only 1~2MB memory on the average.

When a super large shuffle partition comes in, one of the disks quickly reaches its high watermark and memory starts to flush directly to HDFS. Since each task has only a small size (1-2MB) in the memory, it could flush(append) to HDFS with only 1~2MB at a time even if the configured flush threshold rss.server.flush.cold.storage.threshold.size is 64MB.

IMO, the flush threshold works only with two preconditions:

  1. The shuffle server has enough memory compared to concurrent shuffle tasks (otherwise the threshold may not be reached)
  2. The shuffle server has enough disk space (otherwise small partitions may fallback to cold storage)

If either condition is not met, it's better to have another machanism to better utilize the limited memory/disk resources, instead of wasting them on a uncommonly large partition.

Please correct me if I'm wrong.

Yes, you're right. But if we only write 2MB to HDFS directly, the HDFS will have poor performance.

@zuston
Copy link
Member Author

zuston commented Dec 8, 2022

In our production environment the disk is enough in most of the time. And it is difficult to set the threshold of partition size, and this will result in reduced disk usage. So we prefer to write hdfs when the disk is full.

Yes, this is difficult to set the threshold of partition size. I still think existing mechanism is hard to solve the huge single partition, especially in our scenario mentioned by @ChengbingLiu . I want to share some practices that wants to solve this problem but failed.

First case:

I enable the storage type of MEMORY_LOCALFILE_HDFS and the conf of rss.server.flush.cold.storage.threshold.size is 100G. The buffer capacity is 10GB. The storage fallback strategy is LocalStorageFallbackStrategy.

When a huge single partition came and quickly it made the disk reach the high-watermark, it started to flush HDFS directly. However, unfortunately, because this huge partition occupies a large space in memory(6GB), the shuffle dataFlushEvent is large, which leads to the need to spend a longer time to consume. And this will cause other jobs to be unable to write data normally. The picture is shown below.

ss1

Second case:

After above practice, I set the rss.server.flush.cold.storage.threshold.size to 128M, which means the partition buffer once reaches the 128M, it will directly flush to HDFS.

However, unfortunately, this still is useless. Although the huge partition will generate several dataFlushEvents waiting to be flushed to a single HDFS file, but its writerHandler will hold the write lock to write one by one. So this will cause the flush thread pool busy but the writing speed is still slow.

  1. And this also will cause other jobs to be unable to write data normally.
  2. Especially, when in one shuffle-server, there are not many partitions, it will cause the local disk always empty. This is unreasonable.

The case's picture is shown below.

ss2

Possible solution

The above problem is hard to avoid in production environment. I think we need to introduce new strategy to solve. The following solutions are just draft

  1. Introduce the blacklist of a app based on its flushed size, once reached, shuffle-server will let it allow to get limited memory and flush data to HDFS directly
  2. Client could get the multiple candidates(multiple shuffle-servers for one partition, not multiple replica). Once written failed in one shuffle-server, it could write data to another shuffle-server, which will distribute the huge data pressure to uniffle cluster. After all, the overall disk utilization rate of the cluster is relatively low

@jerqi
Copy link
Contributor

jerqi commented Dec 8, 2022

Maybe we could introduce multi-thread writing HDFS. If the file is too big, we could split them to multiple files. ByteDance CSS have similar concept. If file exceed the size, we will open and write another file.

@zuston
Copy link
Member Author

zuston commented Dec 9, 2022

Maybe we could introduce multi-thread writing HDFS. If the file is too big, we could split them to multiple files.

Yes. The key of problem is the low speed of writing single one data file.

ByteDance CSS have similar concept. If file exceed the size, we will open and write another file.

Let me take a look. But I think writing another file is not a good solution, which wont improve the writing concurrency for multiple same partition events.

@jerqi
Copy link
Contributor

jerqi commented Dec 9, 2022

Maybe we could introduce multi-thread writing HDFS. If the file is too big, we could split them to multiple files.

Yes. The key of problem is the low speed of writing single one data file.

ByteDance CSS have similar concept. If file exceed the size, we will open and write another file.

Let me take a look. But I think writing another file is not a good solution, which wont improve the writing concurrency for multiple same partition events.

I mean that we can write multiple files at the same time.
We can tryLock the file lock, we have multiple locks, if we succeed to tryLock, we can write the file. If we fail to tryLock, we will retry another file lock.

@zuston
Copy link
Member Author

zuston commented Dec 9, 2022

I mean that we can write multiple files at the same time.
We can tryLock the file lock, we have multiple locks, if we succeed to tryLock, we can write the file. If we fail to tryLock, we will retry another file lock.

Got it. It's an optional solution, the flush speed could improve based on (HDFS speed) * (file number).

And I think we also need to limit the huge partition app's writing speed to avoid affecting other jobs.

@jerqi
Copy link
Contributor

jerqi commented Dec 9, 2022

I mean that we can write multiple files at the same time.
We can tryLock the file lock, we have multiple locks, if we succeed to tryLock, we can write the file. If we fail to tryLock, we will retry another file lock.

Got it. It's an optional solution, the flush speed could improve based on (HDFS speed) * (file number).

And I think we also need to limit the huge partition app's writing speed to avoid affecting other jobs.

VIP Shop have similarly minds. cc @Gustfh

@jerqi
Copy link
Contributor

jerqi commented Dec 9, 2022

I mean that we can write multiple files at the same time.
We can tryLock the file lock, we have multiple locks, if we succeed to tryLock, we can write the file. If we fail to tryLock, we will retry another file lock.

Got it. It's an optional solution, the flush speed could improve based on (HDFS speed) * (file number).

I feel that it's necessary. Because it will be too slow that the server only use single thread to flush a large partition although we have multiple servers.

@zuston
Copy link
Member Author

zuston commented Dec 9, 2022

I feel that it's necessary. Because it will be too slow that the server only use single thread to flush a large partition although we have multiple servers.

Make sense. I think I will do this optimization.

@zuston
Copy link
Member Author

zuston commented Dec 9, 2022

I feel that it's necessary. Because it will be too slow that the server only use single thread to flush a large partition although we have multiple servers.

Make sense. I think I will do this optimization.

Draft a PR #396 .

For this PR, it only solve the problem mentioned by second case. But if the ShuffleDataFlushEvent is so huge(5G) to HDFS, it's still handled by single one thread. So we also need to generate multiple events from huge ShuffleDataFlushEvent to improve writing speed when flushing to HDFS.

jerqi pushed a commit that referenced this issue Dec 13, 2022
…DFS storage (#396)

### What changes were proposed in this pull request?
1. Introduce the `PooledHdfsShuffleWriteHandler` to support writing single partition to multiple HDFS files concurrently.

### Why are the changes needed?
As the problem mentioned by #378 (comment), the writing speed of HDFS is too slow and it can't write concurrently. Especially when huge partition exists, this problem will cause other apps slow due to the slight memory.

So the improvement of writing speed is an important factor to flush the huge partition to HDFS quickly.

### Does this PR introduce _any_ user-facing change?
Yes.

### How was this patch tested?
1. UTs
@zuston
Copy link
Member Author

zuston commented Dec 21, 2022

Introduce the blacklist of a app based on its flushed size, once reached, shuffle-server will let it allow to get limited memory and flush data to HDFS directly

If the huge partition has the high writing speed and the speed of flushing is slow, the most memory will be occupied by huge partition's app. For ensuring the other apps stability, I prefer limiting the memory usage for huge partition. WDYT? @jerqi cc @advancedxy

@zuston
Copy link
Member Author

zuston commented Jan 6, 2023

This issue will track all the optimizations of huge partition, and all sub-tasks will be connected with this. The solution of handling huge partitions is to make it flush to HDFS directly and limit memory usage, all subtasks are as follows.

  1. Speed up flushing partition data to HDFS.
  2. Introduce the memory usage limitation for huge partitions
  3. Support split huge event into smaller multiple events concurrently to speed up flushing

cc @jerqi @advancedxy I will create some subtasks of issues and PRs, feel free to discuss more.

@zuston zuston pinned this issue Jan 6, 2023
@zuston zuston changed the title [Improvement] Introduce partition size based strategy to flush single huge partition data to HDFS [Improvement] Optimize data flushing and memory usage for huge partitions to improve stability Jan 6, 2023
@jerqi
Copy link
Contributor

jerqi commented Jan 6, 2023

This issue will track all the optimizations of huge partition, and all sub-tasks will be connected with this. The solution of handling huge partitions is to make it flush to HDFS directly and limit memory usage, all subtasks are as follows.

  1. Speed up flushing partition data to HDFS.

  2. Introduce the memory usage limitation for huge partitions

    • Record every partition data size in ShuffleTaskInfo
    • Introduce storage selector strategy(to support huge partition flushed to HDFS directly) in MultipleStorageManager to replace fallback strategy
    • Introduce more metrics to monitor huge partitions and so on
  3. Support split huge event into smaller multiple events concurrently to speed up flushing

cc @jerqi @advancedxy I will create some subtasks of issues and PRs, feel free to discuss more.

It's ok for me.

zuston added a commit that referenced this issue Jan 6, 2023
…or one app (#458)


### What changes were proposed in this pull request?

Record every partition data size for one app

### Why are the changes needed?

This is a subtask for #378

### Does this PR introduce _any_ user-facing change?

No

### How was this patch tested?

1. UTs
@advancedxy
Copy link
Contributor

This issue will track all the optimizations of huge partition, and all sub-tasks will be connected with this. The solution of handling huge partitions is to make it flush to HDFS directly and limit memory usage, all subtasks are as follows.

  1. Speed up flushing partition data to HDFS.

  2. Introduce the memory usage limitation for huge partitions

  3. Support split huge event into smaller multiple events concurrently to speed up flushing

cc @jerqi @advancedxy I will create some subtasks of issues and PRs, feel free to discuss more.

Missed this comment. LGTM.

zuston added a commit that referenced this issue Jan 17, 2023
…ata flush (#471)


### What changes were proposed in this pull request?
1. Introduce memory usage limit for huge partition to keep the regular partition writing stable
2. Once partition is marked as huge-partition, when its buffer size is greater than `rss.server.single.buffer.flush.threshold` value,  single-buffer flush will be triggered whatever the single buffer flush is enabled or not

### Why are the changes needed?
1. To solve the problems mentioned by #378 

### Does this PR introduce _any_ user-facing change?
Yes. 

### How was this patch tested?
1. UTs
@zuston
Copy link
Member Author

zuston commented Jan 27, 2023

Closed. Feel free to open it if having any problem.

@zuston zuston closed this as completed Jan 27, 2023
@zuston zuston unpinned this issue Jan 28, 2023
zuston added a commit that referenced this issue Mar 1, 2023
…ategy (#621)

### What changes were proposed in this pull request?

1. Introduce storage manager selector to support more selector strategy for `MultiStorageManager`
2. Introduce the conf of `rss.server.multistorage.manager.selector.class` to support different flush strategy, like I hope huge partition directly flushed to HDFS and normal partition could be flushed to DISK when single buffer flush is enabled. 

### Why are the changes needed?
Solving the problem mentioned in #378 (comment). 

In current codebase, when encountering huge partition, if single buffer flush is enabled, the normal partition data will be flush to HDFS(I don't hope so, because the local disk is free and the flushing speed is faster than HDFS). But if disable single flush buffer, the huge partition event before marking as huge partition may be big, which cause the slow flushing and then cause requiring allocated buffer failed.

Based on above problems, this PR is to make single event carrying with 100 mb flushed into HDFS or local file leveraging the conf of `rss.server.multistorage.manager.selector.class`

### Does this PR introduce _any_ user-facing change?
Yes. Doc will be updated later.

### How was this patch tested?
1. UTs
advancedxy pushed a commit to advancedxy/incubator-uniffle that referenced this issue Mar 21, 2023
…ategy (apache#621)

### What changes were proposed in this pull request?

1. Introduce storage manager selector to support more selector strategy for `MultiStorageManager`
2. Introduce the conf of `rss.server.multistorage.manager.selector.class` to support different flush strategy, like I hope huge partition directly flushed to HDFS and normal partition could be flushed to DISK when single buffer flush is enabled. 

### Why are the changes needed?
Solving the problem mentioned in apache#378 (comment). 

In current codebase, when encountering huge partition, if single buffer flush is enabled, the normal partition data will be flush to HDFS(I don't hope so, because the local disk is free and the flushing speed is faster than HDFS). But if disable single flush buffer, the huge partition event before marking as huge partition may be big, which cause the slow flushing and then cause requiring allocated buffer failed.

Based on above problems, this PR is to make single event carrying with 100 mb flushed into HDFS or local file leveraging the conf of `rss.server.multistorage.manager.selector.class`

### Does this PR introduce _any_ user-facing change?
Yes. Doc will be updated later.

### How was this patch tested?
1. UTs
xianjingfeng pushed a commit to xianjingfeng/incubator-uniffle that referenced this issue Apr 5, 2023
…ategy (apache#621)

### What changes were proposed in this pull request?

1. Introduce storage manager selector to support more selector strategy for `MultiStorageManager`
2. Introduce the conf of `rss.server.multistorage.manager.selector.class` to support different flush strategy, like I hope huge partition directly flushed to HDFS and normal partition could be flushed to DISK when single buffer flush is enabled. 

### Why are the changes needed?
Solving the problem mentioned in apache#378 (comment). 

In current codebase, when encountering huge partition, if single buffer flush is enabled, the normal partition data will be flush to HDFS(I don't hope so, because the local disk is free and the flushing speed is faster than HDFS). But if disable single flush buffer, the huge partition event before marking as huge partition may be big, which cause the slow flushing and then cause requiring allocated buffer failed.

Based on above problems, this PR is to make single event carrying with 100 mb flushed into HDFS or local file leveraging the conf of `rss.server.multistorage.manager.selector.class`

### Does this PR introduce _any_ user-facing change?
Yes. Doc will be updated later.

### How was this patch tested?
1. UTs
rickyma added a commit to rickyma/incubator-uniffle that referenced this issue Apr 25, 2024
zuston pushed a commit that referenced this issue Apr 26, 2024
### What changes were proposed in this pull request?

Fix the metric huge_partition_num.

### Why are the changes needed?

A follow-up PR for: #494.

### Does this PR introduce _any_ user-facing change?

No.

### How was this patch tested?

Existing UTs.
jerqi pushed a commit that referenced this issue Apr 30, 2024
### What changes were proposed in this pull request?

Fix the metric huge_partition_num.

### Why are the changes needed?

A follow-up PR for: #494.

### Does this PR introduce _any_ user-facing change?

No.

### How was this patch tested?

Existing UTs.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

5 participants