In this blog, we discuss using Spark Structured streaming via Datastax Enterprise version 6.8.15 to process crypto trade information made available on a Kafka topic. Data sent to the Kafka topic is generated from CryptoCompare API and streamed through Websockets and consumed using Akka.
Introduction to project
The project can be run in 2 modes: in-memory mode and Kafka mode. In in-memory mode, the data streamed from the crypto compares WebSocket and is consumed; the aggregations are displayed on the console. In the Kafka mode, data streamed into the WebSockets is consumed using Akka and then sent to a Kafka topic. A Spark job written in Scala listens on that Kafka topic and performs aggregations on the data consumed before the results are finally sent to a Cassandra database.
Tech Stacks used are as follows:
- Apache Spark 2.4.0 as provided on DSE 6.8.15
- Akka version 2.5.32
- Kafka version 2.2.1
WebSocket is a protocol that provides a bi-directional channel between the browser and web server usually run over an upgraded HTTP(S) connection. Data is exchanged in messages in the form of either binary data or Unicode text.
If running this project on Docker, do take care to make sure necessary ports are exposed in the relevant docker-compose.yml file for the Kafka container and the docker-compose.yml DSE container both available in Github. The referenced port include
- Kafka: 9092 for communication to the broker.
- Cassandra: 9042 to write to the provided keyspace.
- Spark: 4000 to allow for monitoring the spark job through the Spark UI. This is optional.
Introduction to walkthrough
The GitHub Repo for this project can be found here. You can follow along below, or use the link to follow along on the GitHub repo.
Walkthrough
If you are not using the docker-compose.yml file provided in this demo to create the necessary containers, please ensure you have the correct DSE version 6.8.15 running and also installed Kafka broker 2.2.1. Also, your DSE installation must have analytics enabled to allow for use of the Apache Spark bundled with it.
Setup
- Create a free account on CryptoCompare.
- Follow the instructions to set up your API_KEY. This API_KEY is what will be used to stream live messages from the WebSocket to our Spark application.
- Clone this Github project using your favorite IDE
- Build the project by running
sbt clean package
- Transfer the jar file to the environment where you will be calling the spark-submit command. If you executed the docker scripts provided earlier to create the container, pls follow the instructions below.
- Open your terminal application and run:
docker exec -it dse-server /bin/bashmkdir -p /opt/cryptocompareexitdocker cp akka-websockets-spark-cassandra_2.11-0.1.jar dse-server:/opt/cryptocompare/
Running the App: Memory Mode
- To run the application in memory mode, copy and paste the command as shown below to submit your application. Be sure to add your API_KEY into the designated placeholder.
dse spark-submit --packages "com.typesafe.akka:akka-stream_2.11:2.5.32,com.typesafe.akka:akka-actor_2.11:2.5.32,\
com.typesafe.akka:akka-http_2.11:10.1.15,com.typesafe.akka:akka-http-jackson_2.11:10.1.15,com.typesafe.akka:akka-http-spray-json_2.11:10.1.15"\
--master 'local[*]' --conf spark.sql.shuffle.partitions=8\
akka-websockets-spark-cassandra_2.11-0.1.jar --mode memory --timeout 150 <API_KEY>
This should start your streaming application and you should see results on your console.
Running the App: Kafka Mode
To run in Kafka mode, you will need to create the needed topic as an additional step.
- If running Kafka as a docker container, download the docker-compose.yml file in the GitHub repo and run the following commands.
cd <<path/to/docker-compose.yml>>
docker-compose up
docker exec -it kafka /bin/bash
cd /opt/bitnami/kafka/bin/
./kafka-topics.sh --create --partitions 1 --replication-factor 1 --bootstrap-server localhost:9092 --topic cryptocompare
- If running Kafka in standalone mode, run the following commands instead.
cd <<path/to/kafka_home/home>>
./kafka-topics.sh --create --partitions 1 --replication-factor 1 --bootstrap-server localhost:9092 --topic cryptocompare
- Create a new keyspace by running the below commands in a terminal.
- If DSE is running on docker, run this first.
docker exec -it dse-server /bin/bash
To create keyspace and tables, run the commands below
cqlshcreate keyspace if not exists cryptocompare with replication = {'class': 'SimpleStrategy', 'replication_factor': 1};CREATE TABLE cryptocompare.trademsgs1minutewindow ( date timestamp, window_start timestamp, window_end timestamp, market text, direction text, fromcoin text, tocurrency text, avgprice double, totalquantity double, totalvol double, counttxns bigint, uuid timeuuid, PRIMARY KEY ((date), window_end, window_start, direction, market)) WITH CLUSTERING ORDER BY (window_end DESC, window_start DESC, direction ASC, market ASC);
- Finally, Submit your application to the cluster by entering the command below. Be sure to update your Kafka Broker, Kafka Topic, Cassandra Hostname, and API_KEY in the designated spots.
dse spark-submit --packages "com.typesafe.akka:akka-stream_2.11:2.5.32,com.typesafe.akka:akka-actor_2.11:2.5.32,\
com.typesafe.akka:akka-http_2.11:10.1.15,com.typesafe.akka:akka-http-jackson_2.11:10.1.15,com.typesafe.akka:akka-http-spray-json_2.11:10.1.15,\
org.apache.spark:spark-streaming-kafka-0-10_2.11:2.4.0,org.apache.spark:spark-sql-kafka-0-10_2.11:2.4.0,\
org.apache.kafka:kafka_2.11:2.2.1,org.apache.kafka:kafka-streams:2.2.1"\
--master 'local[*]' --conf spark.sql.shuffle.partitions=8\
akka-websockets-spark-cassandra_2.11-0.1.jar --mode kafka --timeout 300\
--kafkabroker <KAFKA_BROKER>:9092 --kafkatopic <KAFKA_TOPIC> --cassandraurl <Cassandra_Host>\
<API_KEY>
- Log into CQLSH to view data streamed in real-time.
cqlsh
select * from cryptocompare.trademsgs1minutewindow where date = '2021-01-08 23:00:00+0000';
Code walkthrough and Discussions.
As mentioned earlier, the Akka framework is used to consume the messages being streamed from the cryptocompare API. The WebSocket API defines key components that represent a WebSocket stream: the Source, Sink, and the Flow. The connection is started by the code as shown below.
val uri = Uri(s"wss://streamer.cryptocompare.com/v2?API_key=$APIkey")
val (upgradeResponse, notused) = Http().singleWebSocketRequest(WebSocketRequest(uri), websocketFlow(streamerDFActor, timeout))
Cryptocompare API requires that after the web socket connection is established, messages must be sent along the channel indicating what type of messages the subscriber is interested in and to be streamed to it. Without this quote, trade messages won’t be streamed to a subscriber.
To facilitate this bi-directional transfer, we make use of an ActorRef wrapped in a Source.fromPublisher API call as shown below.
def websocketFlow(streamerDFActor: ActorRef, timeout: Int) = {
import akka.stream.scaladsl._
val (actorRef: ActorRef, publisher: Publisher[TextMessage.Strict]) =
Source.actorRef[String](bufferSize = 16, overflowStrategy = OverflowStrategy.dropNew)
.map(msg => TextMessage.Strict(msg))
.toMat(Sink.asPublisher(false))(Keep.both)
.run()
val printSink: Sink[Message, Future[Done]] =
Sink.foreach {
case message: TextMessage.Strict => processWSJsonMsg(message.text, actorRef, streamerDFActor, timeout)
case _ => // ignore other message types
}
Flow.fromSinkAndSource(printSink, Source.fromPublisher(publisher))
}
With this, message streaming is started and flows into our Sink where it is processed.
The source.fromPublisher API call starts a Source and allows it to be open and not closed immediately until when the caller chooses to close the stream. This is very important as our WebSocket connection requires that it remains open indefinitely for the Trade and Stock messages to keep flowing in.
The Sink in our case uses the spray-JSON library to attempt to parse the received message into known or expected objects. If an incoming message is successfully parsed as a type WsMsgTrade, then the message is sent to a Kafka topic.
def processWSJsonMsg(json: String, actorref: ActorRef, streamerDFActor: ActorRef, timeout: Int) = {
def asWsMsgStart(json: String) = json.parseJson.convertTo[WSMsgStart]
def asWsMsgTrade(json: String) = json.parseJson.convertTo[WSMsgTrade]
try {
Try[Any](asWsMsgStart(json)).orElse(
Try(asWsMsgTrade(json))) match {
case Success(req: WSMsgTrade) =>
logger.debug(s"$req")
streamerDFActor ! req
case Success(req: WSMsgStart) =>
//Send a msg to start our subscription here
actorref ! WSMsgSubRequest("SubAdd", Seq("0~Coinbase~BTC~USD", "0~Binance~BTC~USDT", "0~Kraken~BTC~USD", "0~CoinDeal~BTC~USD", "0~Gemini~BTC~USD")).toJson.prettyPrint
case Success(x) =>
throw new IllegalArgumentException(s"Unknown request type: $x")
case Failure(e) =>
throw e
}
} catch {
case e: JsonParser.ParsingException =>
logger.warn(s"Handled invalid message $json", e.summary)
}
}
Spark structured streaming is started with format: “kafka” to start consuming the TradeMsgs from the kafka topic.
val kafkamsgstream= spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", kafkabootstrap)
.option("subscribe", kafkatopic)
.option("startingOffsets", "latest")
.load()
.selectExpr("CAST(value AS STRING) AS wstradejson")
.select(from_json($"wstradejson", wstradeschema).as("wstrade")) // composite column (struct)
.selectExpr("wstrade.*").as[WSMsgTrade]
Aggregation transformations are performed on this Dataset that is obtained using flatMapGroupWithState function to drop stale records by the eventtime on the TradeMsg received.
val groupeddf = filtereddf
.groupByKey(t => (t.market, t.fromcoin, t.direction, t.window))
.flatMapGroupsWithState(OutputMode.Update(), GroupStateTimeout.EventTimeTimeout())(avgTradeMsgWithWatermarkFn)
def avgTradeMsgWithWatermarkFn(key: (String, String, String, (Timestamp, Timestamp)),
values: Iterator[TradeMsg], state: GroupState[List[TradeMsg]]) : Iterator[TradeMsgAvgByWindowPeriod] = {
if (state.hasTimedOut) {
state.remove()
Iterator()
} else {
val groups = values.to[collection.immutable.Seq]
val previous =
if(state.exists) state.get
else List()
val updatedstate = groups.foldLeft(previous) {
(current, record) => current :+ record
}
state.update(updatedstate)
state.setTimeoutTimestamp(state.getCurrentWatermarkMs(), "5 minutes")
stateToAverageEvent(key, state.get)
}
}
Finally, the output is written to a Cassandra table as shown below
val query = windowperiod.writeStream
.option("checkpointLocation", "checkpoint-kafka-cassandra")
.foreachBatch((batch, batchid) => {
batch.withColumn("uuid", makeuuids()).write
.option("spark.cassandra.connection.host", cassandraurl)
.cassandraFormat("trademsgs1minutewindow", "cryptocompare")
.mode(SaveMode.Append)
.save()
})
.outputMode("update")
.trigger(Trigger.ProcessingTime(15.seconds))
.start()
Conclusion: Lessons learned, Challenges, etc
In this post, we have discussed how to process streaming data using Apache Spark from a Kafka broker with messages being streamed into it through a WebSocket channel.
As mentioned earlier, the CryptoCompare site requires that when a WebSocket connection is initiated, the client must send a request detailing what type of messages it is interested in. Care must be taken when sending this message not to close this connection when successful as the socket stream is made use of to further stream the Trade/ Ticker messages. When the client wishes to stop the messages from flowing in, then a poison message must be sent to stop the stream and eventually close the connection.
Another point to take note of is the event time in the Trade messages received. It is a good idea to filter out/drop stale messages from our aggregation function which is determined by the event time property on the Trade Msg received. We make use of the watermark function in spark structured streaming to drop too late data. To gain better control of when and how to drop this “too late data”, we further make use of the flatMapGroupWithState function to perform stateful streaming.
Finally, data aggregated is written to a Cassandra table using the forEachBatch function which sends batched data to the table.
Challenges experienced while performing the demo included knowing how to stop both the WebSocket and the spark streaming query after a certain timeout. This was handled by calling the ActorSystem scheduler scheduleonce method on when the timeout should occur. Once the timeout period elapses, then a message is sent to stop the WebSocket from receiving further messages. Since the Spark streaming query is the last daemon thread still running and there are no more threads running, the application gracefully stops.
Another challenge is remembering to clear out the checkpoint directory every time you make changes to the transformation query in between the starts and stops of the streaming query. If you don’t clear this folder, you will run into errors because spark attempts to remember the last timestamp of streaming messages before the messages stopped flowing. It saves the current transformation in this checkpoint folder and as such, any changes to the code will render this state invalid.
After performing the demo using Spark, Kafka, Akka Websockets, and the Cryptocompare API, we see how easy it is to combine all the aforementioned stacks to process live streaming messages from a Websocket using Spark and pushed to a Cassandra table. For future considerations, we could set up a time series live graph using Grafana to display the records stored in the Cassandra table.
Does this stack make sense? Could you apply it to something else? Do you think there are any problems with it? Please feel free to share your thoughts below.
Cassandra.Link
Cassandra.Link is a knowledge base that we created for all things Apache Cassandra. Our goal with Cassandra.Link was to not only fill the gap of Planet Cassandra but to bring the Cassandra community together. Feel free to reach out if you wish to collaborate with us on this project in any capacity.
We are a technology company that specializes in building business platforms. If you have any questions about the tools discussed in this post or about any of our services, feel free to send us an email!