Skip to content
Open in Gitpod

创建和管理 Stream

命名资源准则

一个 HStream 资源的名称可以唯一地识别一个 HStream 资源,如一个 stream、 subscription 或 reader。 资源名称必须符合以下要求:

  • 以一个字母开头
  • 长度必须不超过 255 个字符
  • 只包含以下字符。字母[A-Za-z],数字[0-9]。 破折号-,下划线_

*用于资源名称作为 SQL 语句的一部分的情况。例如在 HStream SQL Shell 中或者用 SQL 创建 IO 任务时, 将会出现资源名称无法被正确解析的情况(如与关键词冲突),此时需要用户用双引号 "括住资源名称。这个限制或将会在日后的版本中被改进移除。

Stream 的属性

  • Replication factor

    为了容错性和更高的可用性,每个 Stream 都可以在集群中的节点之间进行复制。一个常 用的生产环境 Replication factor 配置是为 3,也就是说,你的数据总是有三个副本, 这在出错或你想对 Server 进行维护时将会很有帮助。这种复制是以 Stream 为单位上进 行的。

  • Backlog Retention

    该配置控制 HStreamDB 的 Stream 中的 records 被写入后保留的时间。当超过 retention 保留的时间后,HStreamDB 将会清理这些 records,不管它是否被消费过。

    • 默认值=7 天
    • 最小值=1 秒
    • 最大值=21 天

创建一个 stream

在你写入 records 或者 创建一个订阅之前先创建一个 stream。

java
package docs.code.examples;

import io.hstream.HStreamClient;

public class CreateStreamExample {
  public static void main(String[] args) throws Exception {
    // TODO(developer): Replace these variables before running the sample.
    String serviceUrl = "hstream://127.0.0.1:6570";
    if (System.getenv("serviceUrl") != null) {
      serviceUrl = System.getenv("serviceUrl");
    }

    String streamName1 = "your_h_records_stream_name";
    String streamName2 = "your_raw_records_stream_name";

    HStreamClient client = HStreamClient.builder().serviceUrl(serviceUrl).build();
    createStreamWithAttrsExample(client, streamName1);
    createStreamExample(client, streamName2);
    client.close();
  }

  public static void createStreamExample(HStreamClient client, String streamName) {
    client.createStream(streamName);
  }

  public static void createStreamWithAttrsExample(HStreamClient client, String streamName) {
    client.createStream(
        streamName,
        (short) 1 // replication factor
        ,
        10 // Number of shards
        ,
        7 * 24 * 3600 // backlog retention time in seconds
        );
  }
}
go
package examples

import (
	"log"

	"github.com/hstreamdb/hstreamdb-go/hstream"
)

func ExampleCreateStream() error {
	client, err := hstream.NewHStreamClient(YourHStreamServiceUrl)
	if err != nil {
		log.Fatalf("Creating client error: %s", err)
	}
	defer client.Close()

	// Create a stream, only specific streamName
	if err = client.CreateStream("testDefaultStream"); err != nil {
		log.Fatalf("Creating stream error: %s", err)
	}

	// Create a new stream with 1 replica, 5 shards, set the data retention to 1800s.
	err = client.CreateStream("testStream",
		hstream.WithReplicationFactor(1),
		hstream.EnableBacklog(1800),
		hstream.WithShardCount(5))
	if err != nil {
		log.Fatalf("Creating stream error: %s", err)
	}

	return nil
}
python
# https://github.com/hstreamdb/hstreamdb-py/blob/main/examples/snippets/guides.py
import asyncio
import hstreamdb
import os

# NOTE: Replace with your own host and port
host = os.getenv("GUIDE_HOST", "127.0.0.1")
port = os.getenv("GUIDE_PORT", 6570)
stream_name = "your_stream"
subscription = "your_subscription"


# Run: asyncio.run(main(your_async_function))
async def main(*funcs):
    async with await hstreamdb.insecure_client(host=host, port=port) as client:
        for f in funcs:
            await f(client)


async def create_stream(client):
    await client.create_stream(
        stream_name, replication_factor=1, backlog=24 * 60 * 60, shard_count=1
    )

删除一个 Stream

只有当一个 Stream 没有所属的订阅时才允许被删除,除非传一个强制标删除的 flag 。

强制删除一个 Stream

如果你需要删除一个有订阅的 stream 时,请启用强制删除。在强制删除一个 stream 后, 原来 stream 的订阅仍然可以从 backlog 中读取数据。这些订阅的 stream 名字会变成 __deleted_stream__。同时,我们并不允许在被删除的 stream 上创建新的订阅,也不允 许向该 stream 写入新的 record。

java
package docs.code.examples;

import io.hstream.HStreamClient;

public class DeleteStreamExample {
  public static void main(String[] args) throws Exception {
    // TODO(developer): Replace these variables before running the sample.
    // String serviceUrl = "your-service-url-address";
    String serviceUrl = "hstream://127.0.0.1:6570";
    if (System.getenv("serviceUrl") != null) {
      serviceUrl = System.getenv("serviceUrl");
    }

    String streamName1 = "your_h_records_stream_name";
    String streamName2 = "your_raw_records_stream_name";

    HStreamClient client = HStreamClient.builder().serviceUrl(serviceUrl).build();
    deleteStreamExample(client, streamName1);
    deleteStreamForceExample(client, streamName2);
    client.close();
  }

  public static void deleteStreamExample(HStreamClient client, String streamName) {
    client.deleteStream(streamName);
  }

  public static void deleteStreamForceExample(HStreamClient client, String streamName) {
    client.deleteStream(streamName, true);
  }
}
go
package examples

import (
	"github.com/hstreamdb/hstreamdb-go/hstream"
	"log"
)

func ExampleDeleteStream() error {
	client, err := hstream.NewHStreamClient(YourHStreamServiceUrl)
	if err != nil {
		log.Fatalf("Creating client error: %s", err)
	}
	defer client.Close()

	// force delete stream and ignore none exist stream
	if err := client.DeleteStream("testStream",
		hstream.EnableForceDelete,
		hstream.EnableIgnoreNoneExist); err != nil {
		log.Fatalf("Deleting stream error: %s", err)
	}

	if err := client.DeleteStream("testDefaultStream"); err != nil {
		log.Fatalf("Deleting stream error: %s", err)
	}

	return nil
}
python
# https://github.com/hstreamdb/hstreamdb-py/blob/main/examples/snippets/guides.py
import asyncio
import hstreamdb
import os

# NOTE: Replace with your own host and port
host = os.getenv("GUIDE_HOST", "127.0.0.1")
port = os.getenv("GUIDE_PORT", 6570)
stream_name = "your_stream"
subscription = "your_subscription"


# Run: asyncio.run(main(your_async_function))
async def main(*funcs):
    async with await hstreamdb.insecure_client(host=host, port=port) as client:
        for f in funcs:
            await f(client)


async def delete_stream(client):
    await client.delete_stream(stream_name, ignore_non_exist=True, force=True)

列出所有 stream 信息

可以如下拿到所有 HStream 中的 stream:

java
package docs.code.examples;

import io.hstream.HStreamClient;
import io.hstream.Stream;
import java.util.List;

public class ListStreamsExample {
  public static void main(String[] args) throws Exception {
    // TODO(developer): Replace these variables before running the sample.
    String serviceUrl = "hstream://127.0.0.1:6570";
    if (System.getenv("serviceUrl") != null) {
      serviceUrl = System.getenv("serviceUrl");
    }

    HStreamClient client = HStreamClient.builder().serviceUrl(serviceUrl).build();
    listStreamExample(client);
    client.close();
  }

  public static void listStreamExample(HStreamClient client) {
    List<Stream> streams = client.listStreams();
    for (Stream stream : streams) {
      System.out.println(stream.getStreamName());
    }
  }
}
go
package examples

import (
	"fmt"
	"github.com/hstreamdb/hstreamdb-go/hstream"
	"log"
)

func ExampleListStreams() error {
	client, err := hstream.NewHStreamClient(YourHStreamServiceUrl)
	if err != nil {
		log.Fatalf("Creating client error: %s", err)
	}
	defer client.Close()

	streams, err := client.ListStreams()
	if err != nil {
		log.Fatalf("Listing streams error: %s", err)
	}

	for _, stream := range streams {
		fmt.Printf("%+v\n", stream)
	}

	return nil
}
python
# https://github.com/hstreamdb/hstreamdb-py/blob/main/examples/snippets/guides.py
import asyncio
import hstreamdb
import os

# NOTE: Replace with your own host and port
host = os.getenv("GUIDE_HOST", "127.0.0.1")
port = os.getenv("GUIDE_PORT", 6570)
stream_name = "your_stream"
subscription = "your_subscription"


# Run: asyncio.run(main(your_async_function))
async def main(*funcs):
    async with await hstreamdb.insecure_client(host=host, port=port) as client:
        for f in funcs:
            await f(client)


async def list_streams(client):
    ss = await client.list_streams()
    for s in ss:
        print(s)