|
| 1 | +--- |
| 2 | +author_name: Aum Chotaliya |
| 3 | +author_profile: https://github.tools.sap/I748539 |
| 4 | +parser: v2 |
| 5 | +auto_validation: true |
| 6 | +time: 30 |
| 7 | +tags: [ tutorial>Intermediate, software-product>sap-hana-cloud, tutorial>license] |
| 8 | +primary_tag: software-product-function>sap-hana-cloud--data-lake |
| 9 | +--- |
| 10 | + |
| 11 | +# Apache Kafka Python Sample: Consuming and Writing Data into SAP HANA Cloud, data lake Files |
| 12 | +<!-- description --> This tutorial demonstrates how to set up an Apache Kafka instance locally and create Python scripts to produce and consume data into data lake Files based on user specifications. |
| 13 | +## Prerequisites |
| 14 | + - Apache Kafka installed. |
| 15 | + - Python installed. |
| 16 | + - Basic knowledge of Python and Kafka. |
| 17 | + - Access to SAP Business Technology Platform (BTP) and a provisioned data lake instance (HDLF) with configured certificates. Refer to the [SAP HANA Cloud Data Lake Setup](https://developers.sap.com/mission.hana-cloud-data-lake-get-started.html) for data lake setup and this [tutorial](https://developers.sap.com/tutorials/data-lake-file-containers-hdlfscli.html) for configuring certificates. |
| 18 | + - A running local Kafka server with the address `localhost:9092`. |
| 19 | + |
| 20 | +## You will learn |
| 21 | + - How to create a Kafka producer script in Python. |
| 22 | + - How to create a Kafka consumer script in Python. |
| 23 | + - How to integrate and write data into SAP HANA Cloud ,data lake files. |
| 24 | + |
| 25 | +## Intro |
| 26 | +Apache Kafka is a distributed event streaming platform capable of handling trillions of events a day. In this tutorial, you will use a local Kafka instance and create Python scripts to consume and write data into SAP HANA Cloud, data lake files. Specifically, we will be sending random temperature and timestamp data, which will be processed and stored in the data lake in csv format. |
| 27 | + |
| 28 | +--- |
| 29 | + |
| 30 | + |
| 31 | +### Set Up Local Apache Kafka Instance |
| 32 | + |
| 33 | +Ensure that you have a local Apache Kafka instance up and running. If not, you can set up Kafka on your local machine by following the official [Kafka Quickstart Guide](https://kafka.apache.org/quickstart). |
| 34 | + |
| 35 | + |
| 36 | +To follow this tutorial smoothly, you need to create a Kafka topic named `KafkaDemo`: |
| 37 | + |
| 38 | +```Shell |
| 39 | +kafka-topics.sh --create --topic KafkaDemo --bootstrap-server localhost:9092 --partitions 1 --replication-factor 1 |
| 40 | +``` |
| 41 | + |
| 42 | +### Setting Up Producer Code |
| 43 | + |
| 44 | +We'll use a Python script to produce messages to the Kafka topic. |
| 45 | + |
| 46 | +The producer script generates random temperature and timestamp data and sends it as JSON messages to the Kafka topic named KafkaDemo. In this example we will be sending in 512KB of data. |
| 47 | + |
| 48 | +Create a folder called "localkafka" which will have the required certificates, producer and consumer code. |
| 49 | + |
| 50 | +cd into the localkafka folder. |
| 51 | + |
| 52 | + |
| 53 | +```Shell |
| 54 | +mkdir localkafka |
| 55 | +cd localkafka |
| 56 | +``` |
| 57 | + |
| 58 | +Within the localkafka folder, create a subfolder for certificates: Certs. |
| 59 | + |
| 60 | +Copy the client.cert and client.key that have been configured with your datalake instance in the Certs folder. |
| 61 | + |
| 62 | +Here is what it looks like:- |
| 63 | + |
| 64 | + |
| 65 | + |
| 66 | + |
| 67 | +Install Kafka Python Client:- |
| 68 | + |
| 69 | +```Shell |
| 70 | +pip install kafka-python |
| 71 | +``` |
| 72 | + |
| 73 | + |
| 74 | +Write the following code into your producer scipt (Producer.py). Make sure to fill in your instance's information as and when required in the code. |
| 75 | + |
| 76 | +```python |
| 77 | +from kafka import KafkaProducer |
| 78 | +import json |
| 79 | +from datetime import datetime |
| 80 | +import six |
| 81 | +import sys |
| 82 | +import random |
| 83 | + |
| 84 | +# Compatibility fix for Kafka module with Python 3.12 and above |
| 85 | +if sys.version_info >= (3, 12, 0): |
| 86 | + sys.modules['kafka.vendor.six.moves'] = six.moves |
| 87 | + |
| 88 | + |
| 89 | +producer = KafkaProducer( |
| 90 | + bootstrap_servers=['localhost:9092'], |
| 91 | + api_version=(0, 10), |
| 92 | + value_serializer=lambda v: json.dumps(v).encode('utf-8') |
| 93 | +) |
| 94 | + |
| 95 | +max_cap = 512 * 1024 # Maximum buffer size in bytes |
| 96 | +buffer_size = 0 |
| 97 | +buffer = [] |
| 98 | +id = 1 |
| 99 | + |
| 100 | +while buffer_size < max_cap: |
| 101 | + temperature = round(random.uniform(-20.0, 40.0), 2) |
| 102 | + timestamp = datetime.now().isoformat() |
| 103 | + data = { |
| 104 | + 'ID': str(id), |
| 105 | + 'temperature': str(temperature), |
| 106 | + 'timestamp': str(timestamp) |
| 107 | + } |
| 108 | + message = json.dumps(data) |
| 109 | + buffer.append(message) |
| 110 | + buffer_size += len(message.encode('utf-8')) |
| 111 | + id += 1 |
| 112 | + |
| 113 | +# Send buffered messages to the designated Kafka topic |
| 114 | +for msg in buffer: |
| 115 | + producer.send('KafkaDemo', value=msg) # 'KafkaDemo' is the Kafka topic name |
| 116 | + |
| 117 | +producer.close() |
| 118 | +print("Data Successfully Sent") |
| 119 | +``` |
| 120 | + |
| 121 | +Run the code. |
| 122 | + |
| 123 | +Example Output- |
| 124 | + |
| 125 | + |
| 126 | + |
| 127 | + |
| 128 | +Data has successfully been sent to the server. |
| 129 | + |
| 130 | + |
| 131 | +### Setting Up Consumer Code |
| 132 | + |
| 133 | +The consumer script reads the messages from the KafkaDemo topic, converts them to CSV format, and uploads the data to data Lake Files. It creates a new file for every 256 KB of data read. |
| 134 | + |
| 135 | +Create another python file called Consumer.py |
| 136 | + |
| 137 | +Write the following code into your consumer script (Consumer.py). Make sure to fill in you instance's information as and when required in the code. |
| 138 | + |
| 139 | + |
| 140 | +```python |
| 141 | +from kafka import KafkaConsumer |
| 142 | +import json |
| 143 | +import six |
| 144 | +import sys |
| 145 | +import time |
| 146 | +import os |
| 147 | +import http.client |
| 148 | +import ssl |
| 149 | + |
| 150 | +FILES_REST_API = '********-****-****-****-****************.files.hdl.prod-us30.hanacloud.ondemand.com' # The REST API endpoint for your data lake instance |
| 151 | +CONTAINER = '********-****-****-****-****************' # The instance ID of your data lake container |
| 152 | +CRT_PATH = 'Certs/client.crt' # The file path to the client certificate for your data lake instance |
| 153 | +KEY_PATH = 'Certs/client.key' # The file path to the client key for your data lake instance |
| 154 | + |
| 155 | +def upload_file(msg): |
| 156 | + file_name= time.strftime("%Y-%m-%d-%H%M%S")+".csv" |
| 157 | + context = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT) |
| 158 | + context.check_hostname = False |
| 159 | + context.verify_mode = ssl.CERT_NONE |
| 160 | + context.load_cert_chain(certfile=CRT_PATH, keyfile=KEY_PATH) |
| 161 | + request_url = '/webhdfs/v1/bronze/'+ file_name + '?op=CREATE&data=true' |
| 162 | + request_headers = { |
| 163 | + 'x-sap-filecontainer': CONTAINER, |
| 164 | + 'Content-Type': 'application/octet-stream' |
| 165 | + } |
| 166 | + connection = http.client.HTTPSConnection(FILES_REST_API, port=443, context=context) |
| 167 | + connection.request(method="PUT", url=request_url, body=msg, headers=request_headers) |
| 168 | + response = connection.getresponse() |
| 169 | + print("File "+file_name+" created") |
| 170 | + response.close() |
| 171 | + |
| 172 | + |
| 173 | +if sys.version_info >= (3, 12, 0): |
| 174 | + sys.modules['kafka.vendor.six.moves'] = six.moves |
| 175 | + |
| 176 | +consumer = KafkaConsumer( |
| 177 | + 'KafkaDemo', # Tutorial can be replaced by the topic name that you gave in the producer code |
| 178 | + bootstrap_servers = [ |
| 179 | + 'localhost:9092' |
| 180 | + ], |
| 181 | + auto_offset_reset='earliest', |
| 182 | + enable_auto_commit=True, |
| 183 | +) |
| 184 | + |
| 185 | + |
| 186 | +heading=False |
| 187 | +max_cap=256 * 1024 |
| 188 | +buffer=0 |
| 189 | +msg='' |
| 190 | +try: |
| 191 | + for message in consumer: |
| 192 | + if not heading: |
| 193 | + msg+="Id,Temperature,Timestamp\n" |
| 194 | + heading=True |
| 195 | + msg_str = message.value.decode('utf-8') |
| 196 | + msg_without_backslash = msg_str.replace('\\"', '"') |
| 197 | + json_string = msg_without_backslash.strip('"') |
| 198 | + json_obj = json.loads(json_string) |
| 199 | + |
| 200 | + |
| 201 | + row = [str(json_obj['ID']), str(json_obj['temperature']), str(json_obj['timestamp'])] |
| 202 | + csv_row = ','.join(row) + '\n' |
| 203 | + msg += csv_row |
| 204 | + buffer += len(message.value) |
| 205 | + if (buffer>=max_cap): |
| 206 | + upload_file(msg) |
| 207 | + buffer=0 |
| 208 | + msg='' |
| 209 | + heading=False |
| 210 | +except KeyboardInterrupt: |
| 211 | + print("Code stopped by User") |
| 212 | + |
| 213 | +consumer.close() |
| 214 | + |
| 215 | +``` |
| 216 | + |
| 217 | +Run the code. |
| 218 | + |
| 219 | +Example output:- |
| 220 | + |
| 221 | +You should see csv files being created in the terminal. |
| 222 | + |
| 223 | + |
| 224 | + |
| 225 | + |
| 226 | +Or |
| 227 | + |
| 228 | +You can also see this change reflected in SAP HANA database explorer. |
| 229 | + |
| 230 | + |
| 231 | + |
| 232 | +We can see the contents of the file by opening it. |
| 233 | + |
| 234 | + |
| 235 | + |
| 236 | + |
| 237 | +### Explore and Experiment! |
| 238 | + |
| 239 | +In this tutorial, you've learned how to set up an Apache Kafka instance locally and create Python scripts for producing and consuming data, which is then written to data lafe Files. This is just the beginning of what you can do with Kafka and data lake Files. Here are some additional ideas and endpoints you can explore: |
| 240 | + |
| 241 | +1. **Import Data Files into HANA Cloud**: After writing data to data Lake files, learn how to import these files into your HANA Cloud instance for further analysis and reporting. This can help centralize your data for comprehensive insights and decision-making. |
| 242 | +2. **Use Spark for Analysis and Transformation**: Utilize Apache Spark to perform complex data transformations and analyses on data stored in data lake Files. Leverage Spark’s powerful distributed processing capabilities to handle large datasets efficiently and gain deeper insights. |
| 243 | +3. **Kafka Streams**: Explore Kafka Streams to perform stream processing with your data in real-time, allowing for filtering, transforming, aggregating, and joining data streams efficiently. |
| 244 | +--- |
0 commit comments