# AWS Glue Streaming Tutorials - Working with Studio Notebook
##### You are now running a AWS Glue Studio notebook; To start using your notebook you need to start an AWS Glue Interactive Session.


### Optional: Run this cell to see available notebook commands ("magics").


In [None]:
%help

### Start by setting the Glue Job Type to streaming 

In [None]:
%streaming

###  Run this cell to set up and start your interactive session.


In [None]:
%glue_version 3.0

import sys
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.job import Job
from awsglue import DynamicFrame
from datetime import datetime
from pyspark.sql.types import StructType, StructField, StringType, LongType
from pyspark.sql.functions import lit,col,from_json, split
import boto3
  
sc = SparkContext.getOrCreate()
glueContext = GlueContext(sc)
spark = glueContext.spark_session
job = Job(glueContext)

### Define the variables to be used - please replace with relevant values 


In [None]:
output_database_name="REPLACE_MA"
output_table_name="REPLACE_ME"

account_id = boto3.client("sts").get_caller_identity()["Account"]
region_name=boto3.client('s3').meta.region_name
stream_arn_name = "arn:aws:kinesis:{}:{}:stream/GlueStreamTest-{}".format(region_name,account_id,account_id)
s3_bucket_name = "streaming-tutorial-s3-target-{}".format(account_id)

output_location = "s3://{}/streaming_output/".format(s3_bucket_name)
checkpoint_location = "s3://{}/checkpoint_location/".format(s3_bucket_name)


### Read the events from kinesis data streams


In [None]:
data_frame = glueContext.create_data_frame.from_options(
    connection_type="kinesis",
    connection_options={
        "typeOfData": "kinesis",
        "streamARN": stream_arn_name,
        "classification": "json",
        "startingPosition": "earliest",
        "inferSchema": "true",
    },
    transformation_ctx="data_frame",
)


### Sample and print the incoming records
#### the sampling is for debugging purpose. You may comment off the entire code cell below, before deploying the actual code 

In [None]:
options = {
	"pollingTimeInMs": "20000",
	"windowSize": "5 seconds"
}
sampled_dynamic_frame = glueContext.getSampleStreamingDynamicFrame(data_frame, options, None)

count_of_sampled_records = sampled_dynamic_frame.count()

print(count_of_sampled_records)

sampled_dynamic_frame.printSchema()

sampled_dynamic_frame.toDF().show(10,False)
  

### Define the process batch method which will be executed in every microbatch. 
#### The business logic is defined in this method. 

In [None]:
def processBatch(data_frame, batchId):
    if data_frame.count() > 0:
        
        schema = StructType([StructField("eventtime", StringType()),
                             StructField("manufacturer", StringType()),
                             StructField("minutevolume", LongType()),
                             StructField("o2stats", LongType()),
                             StructField("pressurecontrol", LongType()),
                             StructField("serialnumber", StringType()),
                             StructField("ventilatorid", LongType())])
        
        data_frame = data_frame.select(from_json(col("$json$data_infer_schema$_temporary$").cast("string"), schema).alias("data")).select("data.*")
        
        '''
        As a part of the transformation, in this block, we will 
        1/ select only a few columns from the incoming stream [eventtime,manufacturer,o2stats,serialnumber,ventilatorid]
        2/ rename column o2stats to oxygen_stats
        3/ derive a few new columns [serial_identifier,ingest_year,ingest_month,ingest_day]
        4/ store the results into an s3 bucket and also a catalog table, partioned by the derived columns
        '''
        
        data_frame = data_frame.select("eventtime", "manufacturer","o2stats","serialnumber","ventilatorid")
        
        data_frame = data_frame.withColumnRenamed("o2stats", "oxygen_stats")
        
        
         
        current_datetime_utc = datetime.utcnow()
        current_year = str(current_datetime_utc.year)
        current_month = str(current_datetime_utc.month)
        current_day = str(current_datetime_utc.day)
        
        data_frame = data_frame.withColumn("ingest_year", lit(current_year)).withColumn("ingest_month", lit(current_month)).withColumn("ingest_day", lit(current_day))
        data_frame = data_frame.withColumn('serial_identifier', split(data_frame['serialnumber'], '-').getItem(0))
        
        kinesis_dynamic_frame = DynamicFrame.fromDF(data_frame, glueContext, "kinesis_dynamic_frame")
        
        amazon_s3_node = glueContext.getSink(
            path=output_location,
            connection_type="s3",
            updateBehavior="UPDATE_IN_DATABASE",
            partitionKeys=["ingest_year", "ingest_month", "ingest_day"],
            enableUpdateCatalog=True,
            transformation_ctx="amazon_s3_dyf",
        )
        amazon_s3_node.setCatalogInfo(
            catalogDatabase=output_database_name, catalogTableName=output_table_name
        )
        amazon_s3_node.setFormat("glueparquet")
        amazon_s3_node.writeFrame(kinesis_dynamic_frame)

### Trigger the microbatched execution by calling processBatch

In [None]:
glueContext.forEachBatch(
    frame=data_frame,
    batch_function=processBatch,
    options={
        "windowSize": "10 seconds",
        "checkpointLocation": checkpoint_location
    },
)

##### If everything went well, you would't see any output/print statements from the above cell. You can inspect the S3 location or Athena table for the data 