Perform Stream Processing by SQL
This part provides a demo of performing real-time stream processing by SQL. You will be introduced to some basic concepts such as streams, queries and materialized views with some examples to demonstrate the power of our processing engine, such as the ease to use and dealing with complex queries.
Overview
One of the most important applications of stream processing is real-time business information analysis. Imagine that we are managing a supermarket and would like to analyze the sales information to adjust our marketing strategies.
Suppose we have two streams of data:
info(product, category) // represents the category a product belongs to
visit(product, user, length) // represents the length of time when a customer looks at a product
Unlike tables in traditional relational databases, a stream is an endless series of data which comes with time. Next, we will run some analysis on the two streams to get some useful information.
Requirements
Ensure you have deployed HStreamDB successfully. The easiest way is to follow quickstart to start a local cluster. Of course, you can also try other methods mentioned in the Deployment part.
Step 1: Create related streams
We have mentioned that we have two streams, info
and visit
in the overview. Now let's create them. Start an HStream SQL shell and run the following statements:
CREATE STREAM info;
+-------------+---------+----------------+-------------+
| Stream Name | Replica | Retention Time | Shard Count |
+-------------+---------+----------------+-------------+
| info | 1 | 604800 seconds | 1 |
+-------------+---------+----------------+-------------+
CREATE STREAM visit;
+-------------+---------+----------------+-------------+
| Stream Name | Replica | Retention Time | Shard Count |
+-------------+---------+----------------+-------------+
| visit | 1 | 604800 seconds | 1 |
+-------------+---------+----------------+-------------+
We have successfully created two streams.
Step 2: Create streaming queries
We can now create streaming queries on the streams. A query is a running task that fetches data from the stream(s) and produces results continuously. Let's create a trivial query that fetches data from stream info
and outputs them:
SELECT * FROM info EMIT CHANGES;
The query will keep running until you interrupt it. Next, we can just leave it there and start another query. It fetches data from the stream visit
and outputs the maximum length of time of each product. Start a new SQL shell and run
SELECT product, MAX(length) AS max_len FROM visit GROUP BY product EMIT CHANGES;
Neither of the queries will print any results since we have not inserted any data yet. So let's do that.
Step 3: Insert data into streams
There are multiple ways to insert data into the streams, such as client libraries, and the data inserted will all be cheated the same while processing. You can refer to write data for client usage.
For consistency and ease of demonstration, we would use SQL statements.
Start a new SQL shell and run:
INSERT INTO info (product, category) VALUES ('Apple', 'Fruit');
INSERT INTO visit (product, user, length) VALUES ('Apple', 'Alice', 10);
INSERT INTO visit (product, user, length) VALUES ('Apple', 'Bob', 20);
INSERT INTO visit (product, user, length) VALUES ('Apple', 'Caleb', 10);
Switch to the shells with running queries You should be able to see the expected outputs as follows:
SELECT * FROM info EMIT CHANGES;
{"category":"Fruit","product":"Apple"}
SELECT product, MAX(length) AS max_len FROM visit GROUP BY product EMIT CHANGES;
{"max_len":{"$numberLong":"10"},"product":"Apple"}
{"max_len":{"$numberLong":"20"},"product":"Apple"}
{"max_len":{"$numberLong":"20"},"product":"Apple"}
Note that max_len
changes from 10
to 20
, which is expected.
Step 4: Create materialized views
Now let's do some more complex analysis. If we want to know the longest visit time of each category any time we need it, the best way is to create materialized views.
A materialized view is an object which contains the result of a query. In HStreamDB, the view is maintained and continuously updated in memory, which means we can read the results directly from the view right when needed without any extra computation. Thus getting results from a view is very fast.
Here we can create a view like
CREATE VIEW result AS SELECT info.category, MAX(visit.length) as max_length FROM info JOIN visit ON info.product = visit.product WITHIN (INTERVAL 1 HOUR) GROUP BY info.category;
+--------------------------+---------+--------------------------+---------------------------+
| Query ID | Status | Created Time | SQL Text |
+--------------------------+---------+--------------------------+---------------------------+
| cli_generated_xbexrdhwgz | RUNNING | 2023-07-06T07:46:13+0000 | CREATE VIEW result AS ... |
+--------------------------+---------+--------------------------+---------------------------+
Note the query ID will be different from the one shown above. Now let's try to get something from the view:
SELECT * FROM result;
It outputs no data because we have not inserted any data into the streams since after the view is created. Let's do it now:
INSERT INTO info (product, category) VALUES ('Apple', 'Fruit');
INSERT INTO info (product, category) VALUES ('Banana', 'Fruit');
INSERT INTO info (product, category) VALUES ('Carrot', 'Vegetable');
INSERT INTO info (product, category) VALUES ('Potato', 'Vegetable');
INSERT INTO visit (product, user, length) VALUES ('Apple', 'Alice', 10);
INSERT INTO visit (product, user, length) VALUES ('Apple', 'Bob', 20);
INSERT INTO visit (product, user, length) VALUES ('Carrot', 'Bob', 50);
Step 5: Get results from views
Now let's find out what is in our view:
SELECT * FROM result;
{"category":"Fruit","max_length":{"$numberLong":"20"}}
{"category":"Vegetable","max_length":{"$numberLong":"50"}}
It works. Now insert more data and repeat the inspection:
INSERT INTO visit (product, user, length) VALUES ('Banana', 'Alice', 40);
INSERT INTO visit (product, user, length) VALUES ('Potato', 'Eve', 60);
And query again:
SELECT * FROM result;
{"category":"Fruit","max_length":{"$numberLong":"40"}}
{"category":"Vegetable","max_length":{"$numberLong":"60"}}
The result is updated right away.
Related Pages
For a detailed introduction to the SQL, see HStream SQL.