创建和管理 Stream
命名资源准则
一个 HStream 资源的名称可以唯一地识别一个 HStream 资源,如一个 stream、 subscription 或 reader。 资源名称必须符合以下要求:
- 以一个字母开头
- 长度必须不超过 255 个字符
- 只包含以下字符。字母
[A-Za-z],数字[0-9]。 破折号-,下划线_。
*用于资源名称作为 SQL 语句的一部分的情况。例如在 HStream SQL Shell 中或者用 SQL 创建 IO 任务时, 将会出现资源名称无法被正确解析的情况(如与关键词冲突),此时需要用户用双引号 "括住资源名称。这个限制或将会在日后的版本中被改进移除。
Stream 的属性
Replication factor
为了容错性和更高的可用性,每个 Stream 都可以在集群中的节点之间进行复制。一个常 用的生产环境 Replication factor 配置是为 3,也就是说,你的数据总是有三个副本, 这在出错或你想对 Server 进行维护时将会很有帮助。这种复制是以 Stream 为单位上进 行的。
Backlog Retention
该配置控制 HStreamDB 的 Stream 中的 records 被写入后保留的时间。当超过 retention 保留的时间后,HStreamDB 将会清理这些 records,不管它是否被消费过。
- 默认值=7 天
- 最小值=1 秒
- 最大值=21 天
创建一个 stream
在你写入 records 或者 创建一个订阅之前先创建一个 stream。
package docs.code.examples;
import io.hstream.HStreamClient;
public class CreateStreamExample {
public static void main(String[] args) throws Exception {
// TODO(developer): Replace these variables before running the sample.
String serviceUrl = "hstream://127.0.0.1:6570";
if (System.getenv("serviceUrl") != null) {
serviceUrl = System.getenv("serviceUrl");
}
String streamName1 = "your_h_records_stream_name";
String streamName2 = "your_raw_records_stream_name";
HStreamClient client = HStreamClient.builder().serviceUrl(serviceUrl).build();
createStreamWithAttrsExample(client, streamName1);
createStreamExample(client, streamName2);
client.close();
}
public static void createStreamExample(HStreamClient client, String streamName) {
client.createStream(streamName);
}
public static void createStreamWithAttrsExample(HStreamClient client, String streamName) {
client.createStream(
streamName,
(short) 1 // replication factor
,
10 // Number of shards
,
7 * 24 * 3600 // backlog retention time in seconds
);
}
}package examples
import (
"log"
"github.com/hstreamdb/hstreamdb-go/hstream"
)
func ExampleCreateStream() error {
client, err := hstream.NewHStreamClient(YourHStreamServiceUrl)
if err != nil {
log.Fatalf("Creating client error: %s", err)
}
defer client.Close()
// Create a stream, only specific streamName
if err = client.CreateStream("testDefaultStream"); err != nil {
log.Fatalf("Creating stream error: %s", err)
}
// Create a new stream with 1 replica, 5 shards, set the data retention to 1800s.
err = client.CreateStream("testStream",
hstream.WithReplicationFactor(1),
hstream.EnableBacklog(1800),
hstream.WithShardCount(5))
if err != nil {
log.Fatalf("Creating stream error: %s", err)
}
return nil
}# https://github.com/hstreamdb/hstreamdb-py/blob/main/examples/snippets/guides.py
import asyncio
import hstreamdb
import os
# NOTE: Replace with your own host and port
host = os.getenv("GUIDE_HOST", "127.0.0.1")
port = os.getenv("GUIDE_PORT", 6570)
stream_name = "your_stream"
subscription = "your_subscription"
# Run: asyncio.run(main(your_async_function))
async def main(*funcs):
async with await hstreamdb.insecure_client(host=host, port=port) as client:
for f in funcs:
await f(client)
async def create_stream(client):
await client.create_stream(
stream_name, replication_factor=1, backlog=24 * 60 * 60, shard_count=1
)删除一个 Stream
只有当一个 Stream 没有所属的订阅时才允许被删除,除非传一个强制标删除的 flag 。
强制删除一个 Stream
如果你需要删除一个有订阅的 stream 时,请启用强制删除。在强制删除一个 stream 后, 原来 stream 的订阅仍然可以从 backlog 中读取数据。这些订阅的 stream 名字会变成 __deleted_stream__。同时,我们并不允许在被删除的 stream 上创建新的订阅,也不允 许向该 stream 写入新的 record。
package docs.code.examples;
import io.hstream.HStreamClient;
public class DeleteStreamExample {
public static void main(String[] args) throws Exception {
// TODO(developer): Replace these variables before running the sample.
// String serviceUrl = "your-service-url-address";
String serviceUrl = "hstream://127.0.0.1:6570";
if (System.getenv("serviceUrl") != null) {
serviceUrl = System.getenv("serviceUrl");
}
String streamName1 = "your_h_records_stream_name";
String streamName2 = "your_raw_records_stream_name";
HStreamClient client = HStreamClient.builder().serviceUrl(serviceUrl).build();
deleteStreamExample(client, streamName1);
deleteStreamForceExample(client, streamName2);
client.close();
}
public static void deleteStreamExample(HStreamClient client, String streamName) {
client.deleteStream(streamName);
}
public static void deleteStreamForceExample(HStreamClient client, String streamName) {
client.deleteStream(streamName, true);
}
}package examples
import (
"github.com/hstreamdb/hstreamdb-go/hstream"
"log"
)
func ExampleDeleteStream() error {
client, err := hstream.NewHStreamClient(YourHStreamServiceUrl)
if err != nil {
log.Fatalf("Creating client error: %s", err)
}
defer client.Close()
// force delete stream and ignore none exist stream
if err := client.DeleteStream("testStream",
hstream.EnableForceDelete,
hstream.EnableIgnoreNoneExist); err != nil {
log.Fatalf("Deleting stream error: %s", err)
}
if err := client.DeleteStream("testDefaultStream"); err != nil {
log.Fatalf("Deleting stream error: %s", err)
}
return nil
}# https://github.com/hstreamdb/hstreamdb-py/blob/main/examples/snippets/guides.py
import asyncio
import hstreamdb
import os
# NOTE: Replace with your own host and port
host = os.getenv("GUIDE_HOST", "127.0.0.1")
port = os.getenv("GUIDE_PORT", 6570)
stream_name = "your_stream"
subscription = "your_subscription"
# Run: asyncio.run(main(your_async_function))
async def main(*funcs):
async with await hstreamdb.insecure_client(host=host, port=port) as client:
for f in funcs:
await f(client)
async def delete_stream(client):
await client.delete_stream(stream_name, ignore_non_exist=True, force=True)列出所有 stream 信息
可以如下拿到所有 HStream 中的 stream:
package docs.code.examples;
import io.hstream.HStreamClient;
import io.hstream.Stream;
import java.util.List;
public class ListStreamsExample {
public static void main(String[] args) throws Exception {
// TODO(developer): Replace these variables before running the sample.
String serviceUrl = "hstream://127.0.0.1:6570";
if (System.getenv("serviceUrl") != null) {
serviceUrl = System.getenv("serviceUrl");
}
HStreamClient client = HStreamClient.builder().serviceUrl(serviceUrl).build();
listStreamExample(client);
client.close();
}
public static void listStreamExample(HStreamClient client) {
List<Stream> streams = client.listStreams();
for (Stream stream : streams) {
System.out.println(stream.getStreamName());
}
}
}package examples
import (
"fmt"
"github.com/hstreamdb/hstreamdb-go/hstream"
"log"
)
func ExampleListStreams() error {
client, err := hstream.NewHStreamClient(YourHStreamServiceUrl)
if err != nil {
log.Fatalf("Creating client error: %s", err)
}
defer client.Close()
streams, err := client.ListStreams()
if err != nil {
log.Fatalf("Listing streams error: %s", err)
}
for _, stream := range streams {
fmt.Printf("%+v\n", stream)
}
return nil
}# https://github.com/hstreamdb/hstreamdb-py/blob/main/examples/snippets/guides.py
import asyncio
import hstreamdb
import os
# NOTE: Replace with your own host and port
host = os.getenv("GUIDE_HOST", "127.0.0.1")
port = os.getenv("GUIDE_PORT", 6570)
stream_name = "your_stream"
subscription = "your_subscription"
# Run: asyncio.run(main(your_async_function))
async def main(*funcs):
async with await hstreamdb.insecure_client(host=host, port=port) as client:
for f in funcs:
await f(client)
async def list_streams(client):
ss = await client.list_streams()
for s in ss:
print(s)