You can choose to expose a different object such as ShardId for the unkeyed scenario instead of doing something like ShardedKey<X> with a null/empty X.
Suggestion was deleted
Show more
Show less
Comments above copied from original document
Comment details cannot be verified
Siyuan Chen
Jun 26, 2020
Approver
Good idea!
Reply was deleted
Show more
Show less
Comments above copied from original document
Comment details cannot be verified
Tyson Hamilton
Jun 26, 2020
Approver
Is there any benefit to exposing a shard id?
Since the total number of shards isn't known, and it is generated by the runner, how is it different than say for example a random key generated by a DoFn (from the pipeline author's view)?
One reason shard id could be useful is if a user was requesting a specific number of total shards for writing files for example. But that is different than this dynamic sharding.
Reply was deleted
Show more
Show less
Comment details cannot be verified
Siyuan Chen
Jun 29, 2020
Approver
Agreed that the content of the shard id should not matter. One case we would probably need to expose the shard id is that the form of the sharded keys is needed for downstream stateful DoFns (otherwise it's unclear whether the user states there are sharded or not). If that appears to be a need only specific to dataflow, then it does not have to be exposed to SDK imo.
Reply was deleted
Show more
Show less
Comment details cannot be verified
Luke Cwik
Jun 29, 2020
Approver
Exposing them would only be necessary if a downstream stateful DoFn relied on having the exact same sharding as the upstream stateful DoFn. If the shard isn't exposed, then a regular stateful DoFn would have to GBK to guarantee that all the execution would be key partitioned. If the downstream stateful DoFn also says I support sharding then a runner could choose to reshard as well so the shard id may not be maintained.
Reply was deleted
Show more
Show less
Comment details cannot be verified
Tyson Hamilton
Jul 8, 2020
Approver
If a runner was capable of detecting that sharding across a series of DoFns was maintained, then required again by another stateful DoFn, it seems possible to avoid requiring another GBK to guarantee the partitioning by fusing them together?
There are a lot of features required to achieve this that I'm not sure are available in Beam today. For example DoFn metadata for properties (e.g. shard preserving).
Properties required by a specific DoFn can be back propagated to see if they're already provided by a predecessor, or if a predecessor operation can be coerced into satisfying them (e.g. a partition by key). Intermediate DoFns that preserve that required property (e.g. the shard preserving property) could be included in the analysis.
IIRC the stateful DoFn explicitly inserts a partition during expansion before submitting the pipeline. It could be possible to change this to be expressed as a required property of the stateful DoFn and this would allow the runner to decide how best to achieve those properties, possibly avoiding an unnecessary shuffle.
That would mean no shard-id would need to be exposed here. I'm not sure that the effort/scope of doing this now makes sense but APIs can be difficult to change. WDYT?
Reply was deleted
Show more
Show less
Comment details cannot be verified
Luke Cwik
Jul 8, 2020
Approver
I wasn't disagreeing with you, just stating that it isn't that we want to say that this DoFn preserves the shard but that it needs to have the shard preserved.
Reply was deleted
Show more
Show less
Comment details cannot be verified
Siyuan Chen
Jul 8, 2020
Approver
IIUC the stateful DoFns are already marked with a key_state property based on which the backend decides whether to insert a shuffle or not so it sounds possible to adding additional properties to signal sharding preserving. I think it's reasonable to not expose the shard id to the user facing API and I feel we all agree on that.
Reply was deleted
Show more
Show less
Comment details cannot be verified
Luke Cwik
Jul 8, 2020
Approver
I think the point here is that we could support not exposing the shard id and support the case where the shard id needs to be preserved across multiple DoFns without the shard id being visible to the SDK (so completely within the runner).
Reply was deleted
Show more
Show less
Comment details cannot be verified
Siyuan Chen
Jul 8, 2020
Approver
Agreed.
Reply was deleted
Show more
Show less
Comment details cannot be verified
11 replies
New
Tyson Hamilton
Jun 26, 2020
Approver
I think this is true and desirable.
The value of a shard id for dynamically sharded data isn't obvious to me.
This wording seems like the SDK in all scenarios would be aware of the concept but if it was part of the CacheToken it wouldn't truly know.
Suggestion was deleted
Show more
Show less
Comments above copied from original document
Comment details cannot be verified
Siyuan Chen
Jun 26, 2020
Approver
Would SDK harness need to parse the cache token somehow? Currently the GetStateRequest (the StateKey) doesn't seem to contain the cache token. But the runner needs to know which shard is requesting state.
Reply was deleted
Show more
Show less
Comments above copied from original document
Comment details cannot be verified
1 reply
New
You're suggesting
Gemini created these notes. They can contain errors so should be double-checked. How Gemini takes notes
Drag image to reposition
11
10
9
8
7
6
5
4
3
2
1
1
2
3
4
5
6
7
8
9
10
Outline
Outline
Document tabs
GroupIntoBatches with Runner Determined Sharding
4
Headings you add to the document will appear here.