POC to avoid usage of ReplicationResult#7528
POC to avoid usage of ReplicationResult#7528ankitsol wants to merge 2 commits intoapache:HBASE-28957from
Conversation
|
🎊 +1 overall
This message was automatically generated. |
|
💔 -1 overall
This message was automatically generated. |
| replicateContext.entries.stream().map(WAL.Entry::getEdit).flatMap(e -> e.getCells().stream()) | ||
| .forEach(this::checkCell); | ||
| return ReplicationResult.COMMITTED; | ||
| getReplicationSource().cleanupHFileRefsAndPersistOffsets(replicateContext.getEntries()); |
There was a problem hiding this comment.
Why we need to call this here?
| // replicate the batches to sink side. | ||
| parallelReplicate(replicateContext, batches); | ||
| return ReplicationResult.COMMITTED; | ||
| getReplicationSource().cleanupHFileRefsAndPersistOffsets(replicateContext.getEntries()); |
There was a problem hiding this comment.
OK I think I get why you call this method here, since here we can make sure that the wal entries have been persistent, it is OK for us to persist the offset. But for me, I prefer we follow the old way where call this in ReplicationSourceShipper.
There was a problem hiding this comment.
But for me, I prefer we follow the old way where call this in ReplicationSourceShipper.
I think it's doable. @ankitsol ?
| @@ -216,7 +216,7 @@ public int getTimeout() { | |||
| * the context are assumed to be persisted in the target cluster. | |||
There was a problem hiding this comment.
What I mean is that, we should add a method may be called beforePersistingReplicationOffset, and call it before we call updateLogPosition in ReplicationSourceShipper method. For old implementation, we just do nothing as we can make sure that every thing is persistent, and for S3 based endpoint, we close the writer to persist the data on S3.
There was a problem hiding this comment.
You cannot close the writer every time when something was shipped, because closing and re-opening the same stream is a costly operation if even supported. We have to wait for enough data to be shipped (file size limit) or the configured time spent (time limit) before closing the current stream and opening a new one. This is controlled by the replication endpoint itself.
There was a problem hiding this comment.
So you also need to change the logic in ReplicationSourceShipper, to not always record the offset after shipping. And I do not think this can 'ONLY' be controlled by replication endpoint, in ReplicationSourceShipper you know the size of the WALEntries, and you also know how much time has elapsed after the last recording, so it is easy to implement the logic?
There was a problem hiding this comment.
That's an interesting idea. @vinayakphegde @ankitsol wdyt?
@Apache9 Let's say the ReplicationSourceShipper controls when to record the offset. How would it know which kind of replication endpoint is it working with? Need to record the offset after every shipment or use time/size limit? Shall we make it a new attribute of the endpoints?
There was a problem hiding this comment.
I prefer we control it by time/size limit.
Even if the endpoint can persist the data after every shipment, we do not need to record the offset every time right? We just need to make sure that once the ReplicationSourceShipper want to record the offset, all the data before this offset has been persistent. So we can introduce a beforePersistingReplicationOffset method for replication endpoint, if you persist the data after every shipment, you just need to do nothing. If it is S3 based endpoint, we close the output file to persist the data.
In this way, the ReplicationSourceShipper does not need to know whether the endpoint can persist the data or not after every shipment. And in the future, for HBaseInterClusterReplicationEndpoint, we could also introduce some asynchronous mechanism to increase performance.
There was a problem hiding this comment.
@Apache9 This seems like a good approach. We also want it to be both time and size based.
I have two questions regarding time based approach
- Should this time based count run on a separate thread. Currently in
ContinuousBackupReplicationEndpointwe implemented it as a separate thread - Where should be save time/size based context?
ReplicationSourceShipperorReplicationSource, considering 'ReplicationSourceShipper' is itself a thread
There was a problem hiding this comment.
I think change the default behavior to be size/time based is OK, and we can make size = 0 as no size limit, size = -1 means persisting after every flush, in this way we can make the default size limit as -1 to keep the old behavior.
There was a problem hiding this comment.
@Apache9 This seems like a good approach. We also want it to be both time and size based.
I have two questions regarding time based approach
- Should this time based count run on a separate thread. Currently in
ContinuousBackupReplicationEndpointwe implemented it as a separate thread- Where should be save time/size based context?
ReplicationSourceShipperorReplicationSource, considering 'ReplicationSourceShipper' is itself a thread
I think we can put the logic in ReplicationSourceShipper? There is a while loop in the thread, after every shipment, we calculate the size and time, and determine whether we should persist the offset.
| } | ||
| } | ||
|
|
||
| private ReplicationResult getReplicationResult() { |
There was a problem hiding this comment.
As said above, we should keep these methods here, and before calling these methods, we call the method in ReplicationEndpoint out to flush data out.
No description provided.