-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathtest_beam.py
41 lines (32 loc) · 1.13 KB
/
test_beam.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions
class ConvertToByteArray(beam.DoFn):
def __init__(self):
pass
def setup(self):
pass
def process(self, row):
try:
yield bytearray(row + '\n', 'utf-8')
except Exception as e:
raise e
def run():
options = PipelineOptions([
"--runner=PortableRunner",
# "--job_endpoint=192.168.1.250:7077", # sparkmaster
"--job_endpoint=136.156.88.48:7077", #
#"--artifact_endpoint=10.0.2.15:30091",
#"--save_main_session",
#"--environment_type=DOCKER",
#"--environment_config=docker.io/apache/beam_python3.7_sdk:2.33.0"
])
with beam.Pipeline(options=options) as p:
lines = (p
| 'Create words' >> beam.Create(['this is working'])
| 'Split words' >> beam.FlatMap(lambda words: words.split(' '))
| 'Build byte array' >> beam.ParDo(ConvertToByteArray())
| 'Group' >> beam.GroupBy() # Do future batching here
| 'print output' >> beam.Map(print)
)
if __name__ == "__main__":
run()