git.net

[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]

Re: about PCollection process


Hello,

If I understood correctly you read from a file some parameters that you are going to use to prepare an HBase Scan. If this is the case you cannot do this with the current HBaseIO API, but there is ongoing work to support this transparently with the new SDF API. If you want to track the progress this is the JIRA https://issues.apache.org/jira/browse/BEAM-4020
Hopefully it will be ready in the following days/weeks.

In the meantime you can do a workaround by applying a ParDo after you extract the scan parameters from the files and then do a DoFn to request the data, something similar to what SDF is doing, for ref:
https://github.com/iemejia/beam/blob/2f9b54c6efa1c97c4b030a9b1af44b1327541e5f/sdks/java/io/hbase/src/main/java/org/apache/beam/sdk/io/hbase/HBaseReadSplittableDoFn.java#L37

Hope this helps,
Ismaël

On Thu, Jul 5, 2018 at 4:53 AM Frank Li <surpass_li@xxxxxxxxxx> wrote:
hello,every
        I'm running a Beam pipeline which uses the TextIO read same text from text file, PTransform  peer line search hbase.  result is   PCollection<PCollection<KV<String, RecordData>>>


@Override
public PCollection<PCollection<KV<String, RecordData>>> expand(PCollection<String> lines) {

PCollection<PCollection<KV<String, RecordData>>> results = lines
.apply(ParDo.of(new DoFn<String, PCollection<KV<String, RecordData>>>() {
@ProcessElement
public void processElement(ProcessContext c) {
String vin = c.element();

Pipeline pipelineHbase = Pipeline.create(c.getPipelineOptions());

HBaseIO.Read read = HBaseIO.read().withConfiguration(conf).withTableId(hbaseTable).withKeyRange(
Bytes.toBytes(String.format("%s-%s", vin, startTime)),
Bytes.toBytes(String.format("%s-%s", vin, endTime)));
PCollection<Result> results = pipelineHbase.apply(read);

PCollection<KV<String, RecordData>> recordResults = results
.apply(ParDo.of(new Result2RecordNoModifyDataFn()));

c.output(recordResults);
}

}));

return results;
}


what process PCollection<PCollection<KV<String, RecordData>>> ????