-
Notifications
You must be signed in to change notification settings - Fork 14
Expand file tree
/
Copy pathmain.py
More file actions
22 lines (16 loc) · 755 Bytes
/
main.py
File metadata and controls
22 lines (16 loc) · 755 Bytes
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
import argparse
import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions
from my_package.transform import filter_cart_requests
from my_package.transform import load_apache_logs
def run():
parser = argparse.ArgumentParser()
parser.add_argument("--input", dest="input", required=True)
parser.add_argument("--output", dest="output", required=True)
app_args, pipeline_args = parser. parse_known_args()
with beam.Pipeline(options=PipelineOptions(pipeline_args)) as p:
data = load_apache_logs(p, app_args.input)
output = data | "FILTER" >> beam.Filter(filter_cart_requests)
output | "WRITE" >> beam.io.WriteToText(app_args.output)
if __name__ == '__main__':
run()