CN114327800A - Method and device for writing in topic, processor and stream processing platform - Google Patents
Method and device for writing in topic, processor and stream processing platform Download PDFInfo
- Publication number
- CN114327800A CN114327800A CN202111633573.4A CN202111633573A CN114327800A CN 114327800 A CN114327800 A CN 114327800A CN 202111633573 A CN202111633573 A CN 202111633573A CN 114327800 A CN114327800 A CN 114327800A
- Authority
- CN
- China
- Prior art keywords
- records
- topic
- partition
- writing
- current transaction
- Prior art date
- Legal status (The legal status is an assumption and is not a legal conclusion. Google has not performed a legal analysis and makes no representation as to the accuracy of the status listed.)
- Pending
Links
- 238000000034 method Methods 0.000 title claims abstract description 112
- 238000012545 processing Methods 0.000 title claims abstract description 66
- 238000005192 partition Methods 0.000 claims abstract description 99
- 230000008569 process Effects 0.000 description 12
- 238000010586 diagram Methods 0.000 description 4
- 230000006870 function Effects 0.000 description 4
- 230000008878 coupling Effects 0.000 description 3
- 238000010168 coupling process Methods 0.000 description 3
- 238000005859 coupling reaction Methods 0.000 description 3
- 230000006399 behavior Effects 0.000 description 2
- 230000005540 biological transmission Effects 0.000 description 2
- 238000004891 communication Methods 0.000 description 2
- 230000000694 effects Effects 0.000 description 2
- 230000004048 modification Effects 0.000 description 2
- 238000012986 modification Methods 0.000 description 2
- 238000012546 transfer Methods 0.000 description 2
- 238000004590 computer program Methods 0.000 description 1
- 238000005516 engineering process Methods 0.000 description 1
- 230000006872 improvement Effects 0.000 description 1
- 230000007246 mechanism Effects 0.000 description 1
- 230000003287 optical effect Effects 0.000 description 1
- 230000002085 persistent effect Effects 0.000 description 1
- 238000011084 recovery Methods 0.000 description 1
- 238000000638 solvent extraction Methods 0.000 description 1
Images
Landscapes
- Information Retrieval, Db Structures And Fs Structures Therefor (AREA)
Abstract
Description
技术领域technical field
本申请涉及topic的写入领域,具体而言,涉及一种topic的写入方法、写入装置、计算机可读存储介质、处理器与流处理平台。The present application relates to the field of topic writing, and in particular, to a topic writing method, a writing device, a computer-readable storage medium, a processor, and a stream processing platform.
背景技术Background technique
在Kafka中,如果要实现多个topic记录同时写入,可以将这些记录通过同一个事务提交。Flink原生Kafka连接器只有在Semantic.EXACTLY_ONCE模式下会使用Kafka事务来保证单条记录精准一次写入,但是不支持同时写入多个topic。当前,要实现Flink多topic写入,可以通过添加多个sink的方式,一个sink对应一个Kafka连接器,方法步骤如下:In Kafka, if you want to write multiple topic records at the same time, you can submit these records through the same transaction. Flink's native Kafka connector only uses Kafka transactions in Semantic.EXACTLY_ONCE mode to ensure that a single record is written exactly once, but it does not support writing multiple topics at the same time. Currently, to implement Flink multi-topic writing, you can add multiple sinks, one sink corresponds to one Kafka connector. The method steps are as follows:
(1)一个topic对应创建一个Kafka连接器FlinkKafkaProducer;(1) Create a Kafka connector FlinkKafkaProducer corresponding to a topic;
(2)通过DataStream的addSink()方法,逐一将创建的FlinkKafkaProducer加入到任务拓扑中。(2) Add the created FlinkKafkaProducers to the task topology one by one through the addSink() method of DataStream.
多sink方案虽然实现了一条记录可以发送到多个topic,但这个方案存在诸多弊端:Although the multi-sink scheme realizes that a record can be sent to multiple topics, this scheme has many disadvantages:
(1)无法保证topic之间的强一致性。多topic的写入不在同一个事务中,无法保证写入同时成功或失败,因此无法保证topic之间的强一致性;(1) Strong consistency between topics cannot be guaranteed. The writing of multiple topics is not in the same transaction, and there is no guarantee that the writing will succeed or fail at the same time, so strong consistency between topics cannot be guaranteed;
(2)降低任务处理性能。如果是上千个topic,那就要创建上千个sink。对Flink来说,增加一个节点也意味着Flink需要分配相应的资源来运行这个节点对应的任务,同时数据路由到sink,中间也需要进行网络传输,因此创建上千个sink需要占用大量的资源,会大大降低任务处理性能。(2) Reduce task processing performance. If there are thousands of topics, then thousands of sinks need to be created. For Flink, adding a node also means that Flink needs to allocate corresponding resources to run the tasks corresponding to this node. At the same time, data is routed to the sink, and network transmission is also required in the middle. Therefore, creating thousands of sinks requires a lot of resources. It will greatly reduce the task processing performance.
多sink方案虽然实现了一条记录可以发送到多个topic,但这个方案存在诸多弊端:Although the multi-sink scheme realizes that a record can be sent to multiple topics, this scheme has many disadvantages:
(1)无法保证topic之间的强一致性。多topic的写入不在同一个事务中,无法保证写入同时成功或失败,因此无法保证topic之间的强一致性;(1) Strong consistency between topics cannot be guaranteed. The writing of multiple topics is not in the same transaction, and there is no guarantee that the writing will succeed or fail at the same time, so strong consistency between topics cannot be guaranteed;
(2)降低任务处理性能。如果是上千个topic,那就要创建上千个sink。对Flink来说,增加一个节点也意味着Flink需要分配相应的资源来运行这个节点对应的任务,同时数据路由到sink,中间也需要进行网络传输,因此创建上千个sink需要占用大量的资源,会大大降低任务处理性能。(2) Reduce task processing performance. If there are thousands of topics, then thousands of sinks need to be created. For Flink, adding a node also means that Flink needs to allocate corresponding resources to run the tasks corresponding to this node. At the same time, data is routed to the sink, and network transmission is also required in the middle. Therefore, creating thousands of sinks requires a lot of resources. It will greatly reduce the task processing performance.
在背景技术部分中公开的以上信息只是用来加强对本文所描述技术的背景技术的理解,因此,背景技术中可能包含某些信息,这些信息对于本领域技术人员来说并未形成在本国已知的现有技术。The above information disclosed in this Background section is only for enhancement of understanding of the background of the technology described in this article and therefore it may contain certain information that does not form part of the already known in this country to a person of ordinary skill in the art known prior art.
发明内容SUMMARY OF THE INVENTION
本申请的主要目的在于提供一种topic的写入方法、写入装置、计算机可读存储介质、处理器与流处理平台,以解决现有技术中多topic写入占用资源过多的问题。The main purpose of the present application is to provide a topic writing method, writing device, computer-readable storage medium, processor and stream processing platform, so as to solve the problem that multiple topic writing occupies too many resources in the prior art.
根据本发明实施例的一个方面,提供了一种topic的写入方法,包括:获取流处理平台的传入数据,得到目标数据;根据所述目标数据生成多个记录,所述记录包括分区字段,所述分区字段用于表征所述记录与所述流处理平台的topic的对应关系;调用当前事务,并根据所述分区字段将多个所述记录写入对应的所述topic,所述记录与所述topic一一对应。According to an aspect of the embodiments of the present invention, a method for writing a topic is provided, including: acquiring incoming data of a stream processing platform to obtain target data; and generating a plurality of records according to the target data, and the records include a partition field , the partition field is used to represent the correspondence between the record and the topic of the stream processing platform; call the current transaction, and write a plurality of the records into the corresponding topic according to the partition field, the record One-to-one correspondence with the topic.
可选地,在调用当前事务,并根据所述分区字段将多个所述记录写入对应的所述topic之前,所述方法还包括:查询所述当前事务的状态;在所述当前事务处于待提交状态的情况下,将所述当前事务添加至待提交事务集合,并生成新的当前事务。Optionally, before calling the current transaction and writing a plurality of the records into the corresponding topic according to the partition field, the method further includes: querying the status of the current transaction; when the current transaction is in In the case of the to-be-committed state, the current transaction is added to the to-be-committed transaction set, and a new current transaction is generated.
可选地,在调用当前事务,并根据所述分区字段将多个所述记录写入对应的所述topic之后,所述方法还包括:将所述当前事务添加至待提交事务集合;按照添加顺序提交所述待提交事务集合的事务。Optionally, after invoking the current transaction and writing a plurality of the records into the corresponding topic according to the partition field, the method further includes: adding the current transaction to a set of transactions to be submitted; The transactions of the set of transactions to be committed are sequentially committed.
可选地,根据所述目标数据生成多个记录,包括:根据所述目标数据的任务编码获取对应的任务配置文件,所述任务配置文件的配置方式包括输出字段确定方式和分区方式;根据所述输出字段确定方式和所述分区方式对所述目标数据进行处理,生成多个所述记录。Optionally, generating multiple records according to the target data includes: obtaining a corresponding task configuration file according to the task code of the target data, where the configuration mode of the task configuration file includes an output field determination mode and a partition mode; The target data is processed by the output field determination method and the partition method to generate a plurality of the records.
可选地,所述输出字段确定方式包括输出全部字段的方式和输出指定的部分字段的方式。Optionally, the determining manner of the output field includes a manner of outputting all fields and a manner of outputting a specified part of the fields.
可选地,所述分区方式包括自定义分区方式和默认分区方式。Optionally, the partition mode includes a custom partition mode and a default partition mode.
根据本发明实施例的另一方面,还提供了一种topic的写入装置,包括:获取单元,用于获取流处理平台的传入数据,得到目标数据;生成单元,用于根据所述目标数据生成多个记录,所述记录包括分区字段,所述分区字段用于表征所述记录与所述流处理平台的topic的对应关系;写入单元,用于调用当前事务,并根据所述分区字段将多个所述记录写入对应的所述topic,所述记录与所述topic一一对应。According to another aspect of the embodiments of the present invention, a device for writing a topic is further provided, including: an acquiring unit for acquiring incoming data from a stream processing platform to obtain target data; a generating unit for acquiring data according to the target The data generates multiple records, the records include a partition field, and the partition field is used to represent the corresponding relationship between the record and the topic of the stream processing platform; the writing unit is used to call the current transaction, and according to the partition The field writes a plurality of the records into the corresponding topic, and the records are in one-to-one correspondence with the topic.
根据本发明实施例的又一方面,还提供了一种计算机可读存储介质,所述计算机可读存储介质包括存储的程序,其中,所述程序执行任意一种所述的方法。According to yet another aspect of the embodiments of the present invention, a computer-readable storage medium is further provided, the computer-readable storage medium includes a stored program, wherein the program executes any one of the methods.
根据本发明实施例的再一方面,还提供了一种处理器,所述处理器用于运行程序,其中,所述程序运行时执行任意一种所述的方法。According to yet another aspect of the embodiments of the present invention, a processor is also provided, and the processor is configured to run a program, wherein any one of the methods is executed when the program is run.
根据本发明实施例的一方面,还提供了一种流处理平台,包括连接器、一个或多个处理器,存储器,显示装置以及一个或多个程序,其中,所述连接器用于将记录写入topic,所述一个或多个程序被存储在所述存储器中,并且被配置为由所述一个或多个处理器执行,所述一个或多个程序包括用于执行任意一种所述的方法。According to an aspect of the embodiments of the present invention, a stream processing platform is also provided, including a connector, one or more processors, a memory, a display device, and one or more programs, wherein the connector is used to write a record into the topic, the one or more programs are stored in the memory and configured to be executed by the one or more processors, the one or more programs including for performing any of the described method.
在本发明实施例中,上述topic的写入方法中,首先,获取流处理平台的传入数据,得到目标数据;然后,根据上述目标数据生成多个记录,上述记录包括分区字段,上述分区字段用于表征上述记录与上述流处理平台的topic的对应关系;最后,调用当前事务,并根据上述分区字段将多个上述记录写入对应的上述topic,上述记录与上述topic一一对应。该方法调用当前事务即可将多个记录写入对应的上述topic,由于分区字段可以表征上述记录与上述流处理平台的topic的对应关系,只需要通过一个连接器即可完成多个topic写入,一个连接器只需创建一个节点,相比于现有技术一个topic建立一个节点,大大降低了资源占用率,解决现有技术中多topic写入占用资源过多的问题。In the embodiment of the present invention, in the above topic writing method, first, the incoming data of the stream processing platform is obtained to obtain target data; then, a plurality of records are generated according to the above target data, and the above records include a partition field, and the above partition field It is used to represent the correspondence between the above records and the topics of the above stream processing platform; finally, the current transaction is called, and a plurality of the above records are written into the corresponding above topics according to the above partition fields, and the above records are in one-to-one correspondence with the above topics. This method calls the current transaction to write multiple records to the corresponding topic. Since the partition field can represent the correspondence between the above record and the topic of the above stream processing platform, only one connector is needed to complete the writing of multiple topics. , a connector only needs to create one node, which greatly reduces the resource occupancy rate compared to the prior art to create one node for one topic, and solves the problem that multiple topic writes occupy too many resources in the prior art.
附图说明Description of drawings
构成本申请的一部分的说明书附图用来提供对本申请的进一步理解,本申请的示意性实施例及其说明用于解释本申请,并不构成对本申请的不当限定。在附图中:The accompanying drawings that form a part of the present application are used to provide further understanding of the present application, and the schematic embodiments and descriptions of the present application are used to explain the present application and do not constitute improper limitations on the present application. In the attached image:
图1示出了根据本申请的一种实施例的topic的写入方法的示意图;FIG. 1 shows a schematic diagram of a method for writing a topic according to an embodiment of the present application;
图2示出了根据本申请的一种实施例的topic的写入装置的示意图;FIG. 2 shows a schematic diagram of a writing device for a topic according to an embodiment of the present application;
图3示出了根据本申请的一种实施例的连接器的处理逻辑示意图。FIG. 3 shows a schematic diagram of processing logic of a connector according to an embodiment of the present application.
具体实施方式Detailed ways
需要说明的是,在不冲突的情况下,本申请中的实施例及实施例中的特征可以相互组合。下面将参考附图并结合实施例来详细说明本申请。It should be noted that the embodiments in the present application and the features of the embodiments may be combined with each other in the case of no conflict. The present application will be described in detail below with reference to the accompanying drawings and in conjunction with the embodiments.
为了使本技术领域的人员更好地理解本申请方案,下面将结合本申请实施例中的附图,对本申请实施例中的技术方案进行清楚、完整地描述,显然,所描述的实施例仅仅是本申请一部分的实施例,而不是全部的实施例。基于本申请中的实施例,本领域普通技术人员在没有做出创造性劳动前提下所获得的所有其他实施例,都应当属于本申请保护的范围。In order to make those skilled in the art better understand the solutions of the present application, the technical solutions in the embodiments of the present application will be clearly and completely described below with reference to the accompanying drawings in the embodiments of the present application. Obviously, the described embodiments are only The embodiments are part of the present application, but not all of the embodiments. Based on the embodiments in the present application, all other embodiments obtained by those of ordinary skill in the art without creative work shall fall within the scope of protection of the present application.
需要说明的是,本申请的说明书和权利要求书及上述附图中的术语“第一”、“第二”等是用于区别类似的对象,而不必用于描述特定的顺序或先后次序。应该理解这样使用的数据在适当情况下可以互换,以便这里描述的本申请的实施例。此外,术语“包括”和“具有”以及他们的任何变形,意图在于覆盖不排他的包含,例如,包含了一系列步骤或单元的过程、方法、系统、产品或设备不必限于清楚地列出的那些步骤或单元,而是可包括没有清楚地列出的或对于这些过程、方法、产品或设备固有的其它步骤或单元。It should be noted that the terms "first", "second", etc. in the description and claims of the present application and the above drawings are used to distinguish similar objects, and are not necessarily used to describe a specific sequence or sequence. It is to be understood that the data so used are interchangeable under appropriate circumstances for the embodiments of the application described herein. Furthermore, the terms "comprising" and "having" and any variations thereof, are intended to cover non-exclusive inclusion, for example, a process, method, system, product or device comprising a series of steps or units is not necessarily limited to those expressly listed Rather, those steps or units may include other steps or units not expressly listed or inherent to these processes, methods, products or devices.
为了便于描述,以下对本申请实施例涉及的部分名词或术语进行说明:For the convenience of description, some nouns or terms involved in the embodiments of the present application are described below:
Flink:Apache Flink是一个框架和分布式处理引擎,用于在无边界和有边界数据流上进行有状态的计算。Flink: Apache Flink is a framework and distributed processing engine for stateful computation on unbounded and bounded data streams.
Kafka:Apache Kafka是一个分布式流处理平台。Kafka: Apache Kafka is a distributed stream processing platform.
topic:topic是Kafka中的主题,是将记录发布到的类别或订阅源名称。topic: topic is a topic in Kafka, a category or feed name to publish records to.
FlinkKafkaProducer:Flink原生Kafka连接器,用于Kafka topic中写入数据。FlinkKafkaProducer: Flink's native Kafka connector for writing data in Kafka topics.
Checkpoint:Flink一种状态容错的机制。它持久化算子的状态,形成检查点,用于故障恢复。Checkpoint: Flink is a state fault-tolerant mechanism. It persists the state of the operator and forms checkpoints for failure recovery.
正如背景技术中所说的,现有技术中的多topic写入占用资源过多,为了解决上述问题,本申请的一种典型的实施方式中,提供了一种topic的写入方法、写入装置、计算机可读存储介质、处理器与流处理平台。As mentioned in the background art, multi-topic writing in the prior art occupies too many resources. In order to solve the above problem, a typical implementation of the present application provides a topic writing method, writing Apparatus, computer readable storage medium, processor and stream processing platform.
根据本申请的实施例,提供了一种topic的写入方法。According to an embodiment of the present application, a method for writing a topic is provided.
图1是根据本申请实施例的topic的写入方法的流程图。如图1所示,该方法包括以下步骤:FIG. 1 is a flowchart of a method for writing a topic according to an embodiment of the present application. As shown in Figure 1, the method includes the following steps:
步骤S101,获取流处理平台的传入数据,得到目标数据;Step S101, acquiring incoming data of the stream processing platform to obtain target data;
步骤S102,根据上述目标数据生成多个记录,上述记录包括分区字段,上述分区字段用于表征上述记录与上述流处理平台的topic的对应关系;Step S102, generate a plurality of records according to the above-mentioned target data, the above-mentioned records include a partition field, and the above-mentioned partition fields are used to represent the corresponding relationship between the above-mentioned records and the topics of the above-mentioned stream processing platform;
步骤S103,调用当前事务,并根据上述分区字段将多个上述记录写入对应的上述topic,上述记录与上述topic一一对应。Step S103, calling the current transaction, and writing a plurality of the above records into the corresponding above-mentioned topic according to the above-mentioned partition field, and the above-mentioned records are in one-to-one correspondence with the above-mentioned topic.
上述topic的写入方法中,首先,获取流处理平台的传入数据,得到目标数据;然后,根据上述目标数据生成多个记录,上述记录包括分区字段,上述分区字段用于表征上述记录与上述流处理平台的topic的对应关系;最后,调用当前事务,并根据上述分区字段将多个上述记录写入对应的上述topic,上述记录与上述topic一一对应。该方法调用当前事务即可将多个记录写入对应的上述topic,由于分区字段可以表征上述记录与上述流处理平台的topic的对应关系,只需要通过一个连接器即可完成多个topic写入,一个连接器只需创建一个节点,相比于现有技术一个topic建立一个节点,大大降低了资源占用率,解决现有技术中多topic写入占用资源过多的问题。In the above topic writing method, first, the incoming data of the stream processing platform is obtained to obtain target data; then, a plurality of records are generated according to the above target data, and the above records include a partition field, and the above partition field is used to represent the above records and the above The correspondence between the topics of the stream processing platform; finally, the current transaction is called, and a plurality of the above records are written into the corresponding above topics according to the above partition fields, and the above records are in one-to-one correspondence with the above topics. This method calls the current transaction to write multiple records to the corresponding topic. Since the partition field can represent the correspondence between the above record and the topic of the above stream processing platform, only one connector is needed to complete the writing of multiple topics. , a connector only needs to create one node, which greatly reduces the resource occupancy rate compared to the prior art to create one node for one topic, and solves the problem that multiple topic writes occupy too many resources in the prior art.
在实际的应用过程中,在kafka节点下可以配置多个topic名称,例如,tb-dpdtl-curt-acc-topic和dpst-open-clos-acct-book-info-topic均可以为topic名称。In the actual application process, multiple topic names can be configured under the kafka node. For example, tb-dpdtl-curt-acc-topic and dpst-open-clos-acct-book-info-topic can both be topic names.
需要说明的是,在附图的流程图示出的步骤可以在诸如一组计算机可执行指令的计算机系统中执行,并且,虽然在流程图中示出了逻辑顺序,但是在某些情况下,可以以不同于此处的顺序执行所示出或描述的步骤。It should be noted that the steps shown in the flowcharts of the accompanying drawings may be executed in a computer system, such as a set of computer-executable instructions, and, although a logical sequence is shown in the flowcharts, in some cases, Steps shown or described may be performed in an order different from that herein.
本申请的一种实施例中,在调用当前事务,并根据上述分区字段将多个上述记录写入对应的上述topic之前,上述方法还包括:查询上述当前事务的状态;在上述当前事务处于待提交状态的情况下,将上述当前事务添加至待提交事务集合,并生成新的当前事务,后续通过调用当前事务,根据记录中的分区字段,将多个记录写入对应的topic中,进一步地实现了通过一个连接器将多个topic写入,进一步地解决了现有技术中多topic写入占用资源过多的问题,进一步地保证了topic之间的数据的强一致性。In an embodiment of the present application, before calling the current transaction and writing a plurality of the records into the corresponding topic according to the partition field, the method further includes: querying the status of the current transaction; when the current transaction is pending In the case of committing, add the above current transaction to the set of transactions to be committed, and generate a new current transaction. Then, by calling the current transaction, write multiple records into the corresponding topic according to the partition field in the record, and further It realizes writing multiple topics through one connector, further solves the problem that multiple topic writing occupies too many resources in the prior art, and further ensures the strong consistency of data between topics.
具体地,在实际的应用过程中,当Flink发起checkpoint时,会调用连接器的snapshotState()方法进行快照;调用preCommit()方法将当前事务添加到待提交的事务集合,同时调用beginTransactionInternal()方法生成新的当前事务。Specifically, in the actual application process, when Flink initiates a checkpoint, it will call the snapshotState() method of the connector to take a snapshot; call the preCommit() method to add the current transaction to the set of transactions to be committed, and call the beginTransactionInternal() method at the same time Generate a new current transaction.
本申请的一种具体的实施例中,在上述当前事务处于提交失败状态的情况下,会进行事务的回滚,不保存对上述事务的操作,并恢复到上一次正确状态的行为。例如,在进行转账的场景下,如果转账的过程中出现了错误,即请求方的银行账户的钱已经扣除,而对方的银行账户的钱并没有增加,这样便会出现错误,因此在这种场景下,需要进行事务的回滚,即对请求的银行账户已扣钱这一操作,并不保存,这样便可以回到未执行该事务之前的状态,即请求方的银行账号的钱未扣除成功,对方的银行账号的钱也未增加的这一状态。In a specific embodiment of the present application, when the current transaction is in a commit failure state, the transaction is rolled back, the operations on the transaction are not saved, and the behavior is restored to the last correct state. For example, in the scenario of transferring money, if there is an error in the transfer process, that is, the money in the bank account of the requesting party has been deducted, but the money in the bank account of the other party has not increased, so an error will occur, so in this kind of In the scenario, the transaction needs to be rolled back, that is, the operation of deducting money from the requested bank account is not saved, so that it can return to the state before the transaction is executed, that is, the money in the requester's bank account has not been deducted. If successful, the money in the other party's bank account has not increased.
本申请的又一种实施例中,在调用当前事务,并根据上述分区字段将多个上述记录写入对应的上述topic之后,上述方法还包括:将上述当前事务添加至待提交事务集合;按照添加顺序提交上述待提交事务集合的事务。在该实施例中,按照将当前事务添加至待提交事务集合的顺序,将待提交事务集合中的事务进行提交,这样保证能够较为及时地知晓该事务是否成功被执行。In another embodiment of the present application, after calling the current transaction and writing a plurality of the records into the corresponding topic according to the partition field, the method further includes: adding the current transaction to the set of transactions to be submitted; Add transactions that commit the above set of pending transactions in order. In this embodiment, the transactions in the to-be-committed transaction set are submitted according to the sequence of adding the current transaction to the to-be-committed transaction set, so as to ensure that whether the transaction is successfully executed can be known in a timely manner.
本申请的一种具体的实施例中,当checkpoint完成之后,Flink会回调连接器的notifyCheckpointComplete()方法,并调用commit()方法逐一提交待提交的事务。In a specific embodiment of the present application, after the checkpoint is completed, Flink will call back the notifyCheckpointComplete() method of the connector, and call the commit() method to submit the transactions to be submitted one by one.
为了较为简单和高效地生成多个记录,本申请的另一种实施例中,根据上述目标数据生成多个记录,包括:根据上述目标数据的任务编码获取对应的任务配置文件,上述任务配置文件的配置方式包括输出字段确定方式和分区方式;根据上述输出字段确定方式和上述分区方式对上述目标数据进行处理,生成多个上述记录。In order to generate multiple records relatively simply and efficiently, in another embodiment of the present application, generating multiple records according to the above target data includes: obtaining a corresponding task configuration file according to the task code of the above target data, and the above task configuration file The configuration mode includes an output field determination mode and a partition mode; the target data is processed according to the output field determination mode and the partition mode to generate a plurality of the above records.
本申请的一种具体的实施例中,当有记录进入时,连接器的invoke()方法会被调用;在invoke()方法中,生成多个记录一一对应多个topic,并使用当前事务来发送记录。In a specific embodiment of this application, when a record enters, the invoke() method of the connector will be called; in the invoke() method, multiple records are generated one-to-one corresponding to multiple topics, and the current transaction is used to send the record.
在实际的应用过程中,上述任务配置文件可以为yaml任务配置文件。In the actual application process, the above task configuration file can be a yaml task configuration file.
本申请的再一种实施例中,上述输出字段确定方式包括输出全部字段的方式和输出指定的部分字段的方式,这样保证了后续能够较为灵活地根据输出字段确定方式对目标数据进行处理,进一步地保证了较为高效地得到多个记录。In still another embodiment of the present application, the above-mentioned output field determination method includes a method of outputting all fields and a method of outputting a specified part of the fields, which ensures that the target data can be processed flexibly according to the output field determination method in the future, and further This ensures that multiple records are obtained more efficiently.
具体地,可以通过all_fields节点表示是否全部输出全部字段,当all_fields的取值为true时,确定输出全部字段的方式;当all_fields的取值为false时,确定输出指定的部分字段的方式,当然,当all_fields为false时,还可以通过配置optional_fields节点来指定要输出的字段。Specifically, the all_fields node can be used to indicate whether all fields are output. When the value of all_fields is true, the method of outputting all fields is determined; when the value of all_fields is false, the method of outputting the specified part of the fields is determined. Of course, When all_fields is false, you can also specify the fields to output by configuring the optional_fields node.
为了保证后续能够较为灵活地根据分区方式对目标数据进行处理,以得到多个记录,本申请的一种实施例中,上述分区方式包括自定义分区方式和默认分区方式。In order to ensure that the target data can be processed flexibly according to the partition mode in the future to obtain multiple records, in an embodiment of the present application, the partition mode includes a custom partition mode and a default partition mode.
具体地,可以通过partition_field节点用来配置分区方式,通过partition_class节点可以配置自定义分区方式,进行自定义分区,当然,还可以从partition_field节点和partition_class节点中选一个配置来实现自定义分区;当partition_field节点和partition_class节点都不配置时,则为默认分区方式round-robin。Specifically, the partition_field node can be used to configure the partition mode, and the partition_class node can be used to configure a custom partition mode and perform custom partitions. Of course, you can also select a configuration from the partition_field node and the partition_class node to implement the custom partition; when the partition_field node When neither partition_class nor partition_class is configured, the default partition mode is round-robin.
本申请实施例还提供了一种topic的写入装置,需要说明的是,本申请实施例的topic的写入装置可以用于执行本申请实施例所提供的用于topic的写入方法。以下对本申请实施例提供的topic的写入装置进行介绍。This embodiment of the present application further provides a topic writing apparatus. It should be noted that the topic writing apparatus of the present application embodiment may be used to execute the topic writing method provided by the present application embodiment. The following describes the topic writing apparatus provided by the embodiments of the present application.
图2是根据本申请实施例的topic的写入装置的示意图。如图2所示,该装置包括:FIG. 2 is a schematic diagram of a writing apparatus for a topic according to an embodiment of the present application. As shown in Figure 2, the device includes:
获取单元10,用于获取流处理平台的传入数据,得到目标数据;The obtaining unit 10 is used to obtain the incoming data of the stream processing platform, and obtain the target data;
生成单元20,用于根据上述目标数据生成多个记录,上述记录包括分区字段,上述分区字段用于表征上述记录与上述流处理平台的topic的对应关系;The generating unit 20 is configured to generate a plurality of records according to the above-mentioned target data, the above-mentioned records include a partition field, and the above-mentioned partition field is used to represent the corresponding relationship between the above-mentioned record and the topic of the above-mentioned stream processing platform;
写入单元30,用于调用当前事务,并根据上述分区字段将多个上述记录写入对应的上述topic,上述记录与上述topic一一对应。The writing unit 30 is configured to call the current transaction, and write a plurality of the above records into the corresponding above-mentioned topic according to the above-mentioned partition field, and the above-mentioned records are in one-to-one correspondence with the above-mentioned topic.
上述的topic的写入装置中,获取单元用于获取流处理平台的传入数据,得到目标数据;生成单元用于根据上述目标数据生成多个记录,上述记录包括分区字段,上述分区字段用于表征上述记录与上述流处理平台的topic的对应关系;写入单元用于调用当前事务,并根据上述分区字段将多个上述记录写入对应的上述topic,上述记录与上述topic一一对应。该装置调用当前事务即可将多个记录写入对应的上述topic,由于分区字段可以表征上述记录与上述流处理平台的topic的对应关系,只需要通过一个连接器即可完成多个topic写入,一个连接器只需创建一个节点,相比于现有技术一个topic建立一个节点,大大降低了资源占用率,解决现有技术中多topic写入占用资源过多的问题。In the above-mentioned topic writing device, the acquiring unit is used to acquire the incoming data of the stream processing platform to obtain the target data; the generating unit is used to generate a plurality of records according to the above-mentioned target data, and the above-mentioned records include a partition field, and the above-mentioned partition field is used for Indicates the correspondence between the above record and the topic of the above stream processing platform; the writing unit is used to call the current transaction, and write a plurality of the above records into the corresponding above topic according to the above partition field, and the above records are in one-to-one correspondence with the above topics. The device can call the current transaction to write multiple records to the corresponding topic. Since the partition field can represent the correspondence between the records and the topic of the stream processing platform, only one connector can be used to complete the writing of multiple topics. , a connector only needs to create one node, which greatly reduces the resource occupancy rate compared to the prior art to create one node for one topic, and solves the problem that multiple topic writes occupy too many resources in the prior art.
在实际的应用过程中,在kafka节点下可以配置多个topic名称,例如,tb-dpdtl-curt-acc-topic和dpst-open-clos-acct-book-info-topic均可以为topic名称。In the actual application process, multiple topic names can be configured under the kafka node. For example, tb-dpdtl-curt-acc-topic and dpst-open-clos-acct-book-info-topic can both be topic names.
本申请的一种实施例中,上述装置还包括查询单元和第一添加单元,其中,上述查询单元用于在调用当前事务,并根据上述分区字段将多个上述记录写入对应的上述topic之前,查询上述当前事务的状态;上述第一添加单元用于在上述当前事务处于待提交状态的情况下,将上述当前事务添加至待提交事务集合,并生成新的当前事务,后续通过调用当前事务,根据记录中的分区字段,将多个记录写入对应的topic中,进一步地实现了通过一个连接器将多个topic写入,进一步地解决了现有技术中多topic写入占用资源过多的问题,进一步地保证了topic之间的数据的强一致性。In an embodiment of the present application, the above-mentioned apparatus further includes a query unit and a first addition unit, wherein the above-mentioned query unit is used for invoking the current transaction and before writing the plurality of above-mentioned records into the corresponding above-mentioned topics according to the above-mentioned partition fields , query the state of the current transaction; the first adding unit is used to add the current transaction to the transaction set to be submitted when the current transaction is in the state to be submitted, and generate a new current transaction, and subsequently call the current transaction by calling the current transaction. , according to the partition field in the record, multiple records are written into the corresponding topic, which further realizes the writing of multiple topics through one connector, and further solves the problem that multiple topic writing occupies too many resources in the prior art The problem further ensures the strong consistency of data between topics.
具体地,在实际的应用过程中,当Flink发起checkpoint时,会调用连接器的snapshotState()方法进行快照;调用preCommit()方法将当前事务添加到待提交的事务集合,同时调用beginTransactionInternal()方法生成新的当前事务。Specifically, in the actual application process, when Flink initiates a checkpoint, it will call the snapshotState() method of the connector to take a snapshot; call the preCommit() method to add the current transaction to the set of transactions to be committed, and call the beginTransactionInternal() method at the same time Generate a new current transaction.
本申请的一种具体的实施例中,在上述当前事务处于提交失败状态的情况下,会进行事务的回滚,不保存对上述事务的操作,并恢复到上一次正确状态的行为。例如,在进行转账的场景下,如果转账的过程中出现了错误,即请求方的银行账户的钱已经扣除,而对方的银行账户的钱并没有增加,这样便会出现错误,因此在这种场景下,需要进行事务的回滚,即对请求的银行账户已扣钱这一操作,并不保存,这样便可以回到未执行该事务之前的状态,即请求方的银行账号的钱未扣除成功,对方的银行账号的钱也未增加的这一状态。In a specific embodiment of the present application, when the current transaction is in a commit failure state, the transaction is rolled back, the operations on the transaction are not saved, and the behavior is restored to the last correct state. For example, in the scenario of transferring money, if there is an error in the transfer process, that is, the money in the bank account of the requesting party has been deducted, but the money in the bank account of the other party has not increased, so an error will occur, so in this kind of In the scenario, the transaction needs to be rolled back, that is, the operation of deducting money from the requested bank account is not saved, so that it can return to the state before the transaction is executed, that is, the money in the requester's bank account has not been deducted. If successful, the money in the other party's bank account has not increased.
本申请的又一种实施例中,上述装置还包括第二添加单元和提交单元,其中,上述第二添加单元用于在调用当前事务,并根据上述分区字段将多个上述记录写入对应的上述topic之后,将上述当前事务添加至待提交事务集合;上述提交单元用于按照添加顺序提交上述待提交事务集合的事务。在该实施例中,按照将当前事务添加至待提交事务集合的顺序,将待提交事务集合中的事务进行提交,这样保证能够较为及时地知晓该事务是否成功被执行。In another embodiment of the present application, the above-mentioned apparatus further includes a second adding unit and a submitting unit, wherein the above-mentioned second adding unit is used to call the current transaction and write a plurality of the above-mentioned records into the corresponding partition fields according to the above-mentioned partition fields. After the above topic, the above current transaction is added to the set of transactions to be submitted; the above submission unit is configured to submit the transactions of the above set of transactions to be submitted in the order of addition. In this embodiment, the transactions in the to-be-committed transaction set are submitted according to the sequence of adding the current transaction to the to-be-committed transaction set, so as to ensure that whether the transaction is successfully executed can be known in a timely manner.
本申请的一种具体的实施例中,当checkpoint完成之后,Flink会回调连接器的notifyCheckpointComplete()方法,并调用commit()方法逐一提交待提交的事务。In a specific embodiment of the present application, after the checkpoint is completed, Flink will call back the notifyCheckpointComplete() method of the connector, and call the commit() method to submit the transactions to be submitted one by one.
为了较为简单和高效地生成多个记录,本申请的另一种实施例中,上述生成单元包括获取模块和处理模块,其中,上述获取模块用于根据上述目标数据的任务编码获取对应的任务配置文件,上述任务配置文件的配置方式包括输出字段确定方式和分区方式;上述处理模块用于根据上述输出字段确定方式和上述分区方式对上述目标数据进行处理,生成多个上述记录。In order to generate multiple records relatively simply and efficiently, in another embodiment of the present application, the generating unit includes an acquisition module and a processing module, wherein the acquisition module is configured to acquire the corresponding task configuration according to the task code of the target data The configuration method of the task configuration file includes an output field determination method and a partition method; the processing module is configured to process the target data according to the output field determination method and the partition method to generate a plurality of the above records.
本申请的一种具体的实施例中,当有记录进入时,连接器的invoke()方法会被调用;在invoke()方法中,生成多个记录一一对应多个topic,并使用当前事务来发送记录。In a specific embodiment of this application, when a record enters, the invoke() method of the connector will be called; in the invoke() method, multiple records are generated one-to-one corresponding to multiple topics, and the current transaction is used to send the record.
在实际的应用过程中,上述任务配置文件可以为yaml任务配置文件。In the actual application process, the above task configuration file can be a yaml task configuration file.
本申请的再一种实施例中,上述输出字段确定方式包括输出全部字段的方式和输出指定的部分字段的方式,这样保证了后续能够较为灵活地根据输出字段确定方式对目标数据进行处理,进一步地保证了较为高效地得到多个记录。In still another embodiment of the present application, the above-mentioned output field determination method includes a method of outputting all fields and a method of outputting a specified part of the fields, which ensures that the target data can be processed flexibly according to the output field determination method in the future, and further This ensures that multiple records are obtained more efficiently.
具体地,可以通过all_fields节点表示是否全部输出全部字段,当all_fields的取值为true时,确定输出全部字段的方式;当all_fields的取值为false时,确定输出指定的部分字段的方式,当然,当all_fields为false时,还可以通过配置optional_fields节点来指定要输出的字段。Specifically, the all_fields node can be used to indicate whether all fields are output. When the value of all_fields is true, the method of outputting all fields is determined; when the value of all_fields is false, the method of outputting the specified part of the fields is determined. Of course, When all_fields is false, you can also specify the fields to output by configuring the optional_fields node.
为了保证后续能够较为灵活地根据分区方式对目标数据进行处理,以得到多个记录,本申请的一种实施例中,上述分区方式包括自定义分区方式和默认分区方式。In order to ensure that the target data can be processed flexibly according to the partition mode in the future to obtain multiple records, in an embodiment of the present application, the partition mode includes a custom partition mode and a default partition mode.
具体地,可以通过partition_field节点用来配置分区方式,通过partition_class节点可以配置自定义分区方式,进行自定义分区,当然,还可以从partition_field节点和partition_class节点中选一个配置来实现自定义分区;当partition_field节点和partition_class节点都不配置时,则为默认分区方式round-robin。Specifically, the partition_field node can be used to configure the partition mode, and the partition_class node can be used to configure a custom partition mode and perform custom partitions. Of course, you can also select a configuration from the partition_field node and the partition_class node to implement the custom partition; when the partition_field node When neither partition_class nor partition_class is configured, the default partition mode is round-robin.
上述topic的写入装置包括处理器和存储器,上述获取单元、生成单元和写入单元等均作为程序单元存储在存储器中,由处理器执行存储在存储器中的上述程序单元来实现相应的功能。The above topic writing device includes a processor and a memory. The above acquisition unit, generation unit, and writing unit are all stored in the memory as program units, and the processor executes the program units stored in the memory to implement corresponding functions.
处理器中包含内核,由内核去存储器中调取相应的程序单元。内核可以设置一个或以上,通过调整内核参数来解决现有技术中多topic写入占用资源过多的问题。The processor includes a kernel, and the kernel calls the corresponding program unit from the memory. The kernel can be set to one or more, and the problem that multiple topic writes occupy too many resources in the prior art can be solved by adjusting the kernel parameters.
存储器可能包括计算机可读介质中的非永久性存储器,随机存取存储器(RAM)和/或非易失性内存等形式,如只读存储器(ROM)或闪存(flash RAM),存储器包括至少一个存储芯片。Memory may include non-persistent memory in computer readable media, random access memory (RAM) and/or non-volatile memory, such as read only memory (ROM) or flash memory (flash RAM), the memory including at least one memory chip.
本发明实施例提供了一种计算机可读存储介质,其上存储有程序,该程序被处理器执行时实现上述topic的写入方法。An embodiment of the present invention provides a computer-readable storage medium on which a program is stored, and when the program is executed by a processor, the above-mentioned method for writing a topic is implemented.
本发明实施例提供了一种处理器,上述处理器用于运行程序,其中,上述程序运行时执行上述topic的写入方法。An embodiment of the present invention provides a processor, where the processor is used to run a program, wherein the method for writing the topic is executed when the program is running.
本申请的一种典型的实施例中,还提供了一种流处理平台,该流处理平台包括连接器、一个或多个处理器,存储器,显示装置以及一个或多个程序,其中,上述连接器用于将记录写入topic,上述一个或多个程序被存储在上述存储器中,并且被配置为由上述一个或多个处理器执行,上述一个或多个程序包括用于执行任意一种上述的方法。In a typical embodiment of the present application, a stream processing platform is also provided, the stream processing platform includes a connector, one or more processors, a memory, a display device and one or more programs, wherein the above-mentioned connection The processor is used to write records to the topic, and the one or more programs described above are stored in the above-mentioned memory and are configured to be executed by the one or more processors described above, and the one or more programs described above include a program for executing any of the above method.
上述的流处理平台可以执行上述任意一种上述的方法,上述topic的写入方法中,首先,获取流处理平台的传入数据,得到目标数据;然后,根据上述目标数据生成多个记录,上述记录包括分区字段,上述分区字段用于表征上述记录与上述流处理平台的topic的对应关系;最后,调用当前事务,并根据上述分区字段将多个上述记录写入对应的上述topic,上述记录与上述topic一一对应。该方法调用当前事务即可将多个记录写入对应的上述topic,由于分区字段可以表征上述记录与上述流处理平台的topic的对应关系,只需要通过一个连接器即可完成多个topic写入,一个连接器只需创建一个节点,相比于现有技术一个topic建立一个节点,大大降低了资源占用率,解决现有技术中多topic写入占用资源过多的问题。The above-mentioned stream processing platform can execute any of the above-mentioned methods. In the above-mentioned topic writing method, firstly, the incoming data of the stream processing platform is obtained to obtain the target data; then, a plurality of records are generated according to the above-mentioned target data, and the above-mentioned The record includes a partition field, and the above-mentioned partition field is used to represent the corresponding relationship between the above-mentioned record and the topic of the above-mentioned stream processing platform; finally, the current transaction is called, and a plurality of the above-mentioned records are written into the corresponding above-mentioned topic according to the above-mentioned partition field, and the above-mentioned records are related to the above-mentioned topic. The above topics correspond one-to-one. This method calls the current transaction to write multiple records to the corresponding topic. Since the partition field can represent the correspondence between the above record and the topic of the above stream processing platform, only one connector is needed to complete the writing of multiple topics. , a connector only needs to create one node, which greatly reduces the resource occupancy rate compared to the prior art to create one node for one topic, and solves the problem that multiple topic writes occupy too many resources in the prior art.
为了使得本领域的技术人员更加清楚明确地了解本申请的技术方案,下面将结合具体的实施例进行说明:In order to make those skilled in the art understand the technical solutions of the present application more clearly, the following will be described in conjunction with specific embodiments:
实施例Example
本申请可以通过一个自定义的Kafka连接器,支持多topic的写入,即可以将该Kafka连接器命名为MultiTopicFlinkKafkaProducer,其中各个类/接口的作用如下:CheckpointedFunction接口,该接口用于执行checkpoint操作,initializeState()和snapshotState()方法分别用于初始化状态和对状态进行快照;CheckpointListener接口:该接口用于监听checkpoint活动,notifyCheckpointComplete()和notifyCheckpointAborted()分别会在checkpoint成功和取消中止时调用;RichSinkFunction接口:该接口是Sink的富函数接口,提供了生命周期方法,open()和close()方法会分别在算子创建和销毁时调用;TwoPhaseCommitSinkFunction接口:该抽象类封装了两阶段提交的逻辑模板,子类可以通过实现beginTransaction()、preCommit()、commit()方法来实现自身的事务提交逻辑。This application can support the writing of multiple topics through a custom Kafka connector, that is, the Kafka connector can be named as MultiTopicFlinkKafkaProducer. The functions of each class/interface are as follows: CheckpointedFunction interface, which is used to perform checkpoint operations, The initializeState() and snapshotState() methods are used to initialize the state and take snapshots of the state respectively; CheckpointListener interface: This interface is used to monitor checkpoint activities, notifyCheckpointComplete() and notifyCheckpointAborted() will be called when the checkpoint is successful and canceled respectively; RichSinkFunction interface : This interface is the rich function interface of Sink, which provides life cycle methods. The open() and close() methods will be called when the operator is created and destroyed respectively; TwoPhaseCommitSinkFunction interface: This abstract class encapsulates the logic template submitted in two phases. Subclasses can implement their own transaction commit logic by implementing beginTransaction(), preCommit(), and commit() methods.
如图3所示,连接器MultiTopicFlinkKafkaProducer继承了TwoPhaseCommitSinkFunction接口,其处理逻辑为:首先,进行checkpoint处理逻辑,当Flink发起checkpoint时,会调用连接器的snapshotState()方法进行快照;在preCommit()方法中,将当前事务添加到待提交的事务集合,同时调用beginTransactionInternal()方法生成新的当前事务;其次,处理单条记录,即记录进入处理逻辑,当有记录进入时,Kafka连接器的invoke()方法会被调用;在invoke()方法中,生成多条记录一一对应多个topic,并使用当前事务来发送记录。其中,在创建记录时,会从传入数据中读取任务编码,根据任务编码读取对应的yaml任务配置文件,在任务配置文件定义了输出字段确定方式和分区方式,根据配置生成相应的记录;最后,checkpoint完成后回调,当checkpoint完成之后,Flink会回调连接器的notifyCheckpointComplete()方法;调用commit()方法逐一提交待提交的事务。As shown in Figure 3, the connector MultiTopicFlinkKafkaProducer inherits the TwoPhaseCommitSinkFunction interface, and its processing logic is: first, the checkpoint processing logic is performed. When Flink initiates a checkpoint, the snapshotState() method of the connector is called to take a snapshot; in the preCommit() method , add the current transaction to the transaction set to be submitted, and call the beginTransactionInternal() method to generate a new current transaction; secondly, process a single record, that is, the record enters the processing logic, when a record enters, the invoke() method of the Kafka connector will be called; in the invoke() method, multiple records are generated one-to-one corresponding to multiple topics, and the current transaction is used to send the records. Among them, when creating a record, the task code will be read from the incoming data, the corresponding yaml task configuration file will be read according to the task code, the output field determination method and partitioning method will be defined in the task configuration file, and corresponding records will be generated according to the configuration. ;Finally, call back after the checkpoint is completed. When the checkpoint is completed, Flink will call back the notifyCheckpointComplete() method of the connector; call the commit() method to submit the transactions to be committed one by one.
本发明实施例提供了一种设备,设备包括处理器、存储器及存储在存储器上并可在处理器上运行的程序,处理器执行程序时实现至少以下步骤:An embodiment of the present invention provides a device. The device includes a processor, a memory, and a program stored in the memory and running on the processor. The processor implements at least the following steps when executing the program:
步骤S101,获取流处理平台的传入数据,得到目标数据;Step S101, acquiring incoming data of the stream processing platform to obtain target data;
步骤S102,根据上述目标数据生成多个记录,上述记录包括分区字段,上述分区字段用于表征上述记录与上述流处理平台的topic的对应关系;Step S102, generate a plurality of records according to the above-mentioned target data, the above-mentioned records include a partition field, and the above-mentioned partition fields are used to represent the corresponding relationship between the above-mentioned records and the topics of the above-mentioned stream processing platform;
步骤S103,调用当前事务,并根据上述分区字段将多个上述记录写入对应的上述topic,上述记录与上述topic一一对应。Step S103, calling the current transaction, and writing a plurality of the above records into the corresponding above-mentioned topic according to the above-mentioned partition field, and the above-mentioned records are in one-to-one correspondence with the above-mentioned topic.
本文中的设备可以是服务器、PC、PAD、手机等。The devices in this article can be servers, PCs, PADs, mobile phones, and so on.
本申请还提供了一种计算机程序产品,当在数据处理设备上执行时,适于执行初始化有至少如下方法步骤的程序:The present application also provides a computer program product that, when executed on a data processing device, is adapted to execute a program initialized with at least the following method steps:
步骤S101,获取流处理平台的传入数据,得到目标数据;Step S101, acquiring incoming data of the stream processing platform to obtain target data;
步骤S102,根据上述目标数据生成多个记录,上述记录包括分区字段,上述分区字段用于表征上述记录与上述流处理平台的topic的对应关系;Step S102, generate a plurality of records according to the above-mentioned target data, the above-mentioned records include a partition field, and the above-mentioned partition fields are used to represent the corresponding relationship between the above-mentioned records and the topics of the above-mentioned stream processing platform;
步骤S103,调用当前事务,并根据上述分区字段将多个上述记录写入对应的上述topic,上述记录与上述topic一一对应。Step S103, calling the current transaction, and writing a plurality of the above records into the corresponding above-mentioned topic according to the above-mentioned partition field, and the above-mentioned records are in one-to-one correspondence with the above-mentioned topic.
在本发明的上述实施例中,对各个实施例的描述都各有侧重,某个实施例中没有详述的部分,可以参见其他实施例的相关描述。In the above-mentioned embodiments of the present invention, the description of each embodiment has its own emphasis. For parts that are not described in detail in a certain embodiment, reference may be made to related descriptions of other embodiments.
在本申请所提供的几个实施例中,应该理解到,所揭露的技术内容,可通过其它的方式实现。其中,以上所描述的装置实施例仅仅是示意性的,例如上述单元的划分,可以为一种逻辑功能划分,实际实现时可以有另外的划分方式,例如多个单元或组件可以结合或者可以集成到另一个系统,或一些特征可以忽略,或不执行。另一点,所显示或讨论的相互之间的耦合或直接耦合或通信连接可以是通过一些接口,单元或模块的间接耦合或通信连接,可以是电性或其它的形式。In the several embodiments provided in this application, it should be understood that the disclosed technical content can be implemented in other ways. The device embodiments described above are only illustrative. For example, the division of the above-mentioned units may be a logical function division. In actual implementation, there may be other division methods. For example, multiple units or components may be combined or integrated. to another system, or some features can be ignored, or not implemented. On the other hand, the shown or discussed mutual coupling or direct coupling or communication connection may be through some interfaces, indirect coupling or communication connection of units or modules, and may be in electrical or other forms.
上述作为分离部件说明的单元可以是或者也可以不是物理上分开的,作为单元显示的部件可以是或者也可以不是物理单元,即可以位于一个地方,或者也可以分布到多个单元上。可以根据实际的需要选择其中的部分或者全部单元来实现本实施例方案的目的。The units described above as separate components may or may not be physically separated, and components shown as units may or may not be physical units, that is, may be located in one place, or may be distributed to multiple units. Some or all of the units may be selected according to actual needs to achieve the purpose of the solution in this embodiment.
另外,在本发明各个实施例中的各功能单元可以集成在一个处理单元中,也可以是各个单元单独物理存在,也可以两个或两个以上单元集成在一个单元中。上述集成的单元既可以采用硬件的形式实现,也可以采用软件功能单元的形式实现。In addition, each functional unit in each embodiment of the present invention may be integrated into one processing unit, or each unit may exist physically alone, or two or more units may be integrated into one unit. The above-mentioned integrated units may be implemented in the form of hardware, or may be implemented in the form of software functional units.
上述集成的单元如果以软件功能单元的形式实现并作为独立的产品销售或使用时,可以存储在一个计算机可读取计算机可读存储介质中。基于这样的理解,本发明的技术方案本质上或者说对现有技术做出贡献的部分或者该技术方案的全部或部分可以以软件产品的形式体现出来,该计算机软件产品存储在一个计算机可读存储介质中,包括若干指令用以使得一台计算机设备(可为个人计算机、服务器或者网络设备等)执行本发明各个实施例上述方法的全部或部分步骤。而前述的计算机可读存储介质包括:U盘、只读存储器(ROM,Read-Only Memory)、随机存取存储器(RAM,Random Access Memory)、移动硬盘、磁碟或者光盘等各种可以存储程序代码的介质。If the above-mentioned integrated units are implemented in the form of software functional units and sold or used as independent products, they may be stored in a computer-readable computer-readable storage medium. Based on such understanding, the technical solution of the present invention essentially or the part that contributes to the prior art or the whole or part of the technical solution can be embodied in the form of a software product, and the computer software product is stored in a computer-readable The storage medium includes several instructions for causing a computer device (which may be a personal computer, a server, or a network device, etc.) to execute all or part of the steps of the above-mentioned methods of the various embodiments of the present invention. The aforementioned computer-readable storage medium includes: U disk, read-only memory (ROM, Read-Only Memory), random access memory (RAM, Random Access Memory), mobile hard disk, magnetic disk or optical disk and other various programs that can store programs medium of code.
从以上的描述中,可以看出,本申请上述的实施例实现了如下技术效果:From the above description, it can be seen that the above-mentioned embodiments of the present application achieve the following technical effects:
1)、本申请的topic的写入方法中,首先,获取流处理平台的传入数据,得到目标数据;然后,根据上述目标数据生成多个记录,上述记录包括分区字段,上述分区字段用于表征上述记录与上述流处理平台的topic的对应关系;最后,调用当前事务,并根据上述分区字段将多个上述记录写入对应的上述topic,上述记录与上述topic一一对应。该方法调用当前事务即可将多个记录写入对应的上述topic,由于分区字段可以表征上述记录与上述流处理平台的topic的对应关系,只需要通过一个连接器即可完成多个topic写入,一个连接器只需创建一个节点,相比于现有技术一个topic建立一个节点,大大降低了资源占用率,解决现有技术中多topic写入占用资源过多的问题。1) In the topic writing method of the present application, first, the incoming data of the stream processing platform is obtained to obtain the target data; then, a plurality of records are generated according to the above target data, and the above records include partition fields, and the above partition fields are used for The correspondence between the above records and the topics of the above stream processing platform is represented; finally, the current transaction is called, and a plurality of the above records are written into the corresponding above topics according to the above partition fields, and the above records are in one-to-one correspondence with the above topics. This method calls the current transaction to write multiple records to the corresponding topic. Since the partition field can represent the correspondence between the above record and the topic of the above stream processing platform, only one connector is needed to complete the writing of multiple topics. , a connector only needs to create one node, which greatly reduces the resource occupancy rate compared to the prior art to create one node for one topic, and solves the problem that multiple topic writes occupy too many resources in the prior art.
2)、本申请的topic的写入装置中,获取单元用于获取流处理平台的传入数据,得到目标数据;生成单元用于根据上述目标数据生成多个记录,上述记录包括分区字段,上述分区字段用于表征上述记录与上述流处理平台的topic的对应关系;写入单元用于调用当前事务,并根据上述分区字段将多个上述记录写入对应的上述topic,上述记录与上述topic一一对应。该装置调用当前事务即可将多个记录写入对应的上述topic,由于分区字段可以表征上述记录与上述流处理平台的topic的对应关系,只需要通过一个连接器即可完成多个topic写入,一个连接器只需创建一个节点,相比于现有技术一个topic建立一个节点,大大降低了资源占用率,解决现有技术中多topic写入占用资源过多的问题。2) In the topic writing device of the present application, the acquisition unit is used to acquire the incoming data of the stream processing platform to obtain the target data; the generation unit is used to generate a plurality of records according to the above-mentioned target data, and the above-mentioned records include a partition field, and the above-mentioned The partition field is used to represent the corresponding relationship between the above record and the topic of the above stream processing platform; the writing unit is used to call the current transaction, and write a plurality of the above records into the corresponding above topic according to the above partition field, and the above record and the above topic are one. A correspondence. The device can call the current transaction to write multiple records into the corresponding topic. Since the partition field can represent the correspondence between the records and the topic of the stream processing platform, multiple topics can be written through only one connector. , a connector only needs to create one node, which greatly reduces the resource occupancy rate compared to the prior art to establish one node for one topic, and solves the problem that multiple topic writes occupy too many resources in the prior art.
3)、本申请的流处理平台可以执行上述任意一种上述的方法,上述topic的写入方法中,首先,获取流处理平台的传入数据,得到目标数据;然后,根据上述目标数据生成多个记录,上述记录包括分区字段,上述分区字段用于表征上述记录与上述流处理平台的topic的对应关系;最后,调用当前事务,并根据上述分区字段将多个上述记录写入对应的上述topic,上述记录与上述topic一一对应。该方法调用当前事务即可将多个记录写入对应的上述topic,由于分区字段可以表征上述记录与上述流处理平台的topic的对应关系,只需要通过一个连接器即可完成多个topic写入,一个连接器只需创建一个节点,相比于现有技术一个topic建立一个节点,大大降低了资源占用率,解决现有技术中多topic写入占用资源过多的问题。3) The stream processing platform of the present application can execute any one of the above methods. In the above topic writing method, first, the incoming data of the stream processing platform is obtained to obtain target data; record, the record includes a partition field, and the partition field is used to represent the corresponding relationship between the record and the topic of the stream processing platform; finally, the current transaction is called, and a plurality of the records are written into the corresponding topic according to the partition field , the above records are in one-to-one correspondence with the above topics. This method calls the current transaction to write multiple records to the corresponding topic. Since the partition field can represent the correspondence between the above record and the topic of the above stream processing platform, only one connector is needed to complete the writing of multiple topics. , a connector only needs to create one node, which greatly reduces the resource occupancy rate compared to the prior art to create one node for one topic, and solves the problem that multiple topic writes occupy too many resources in the prior art.
以上所述仅为本申请的优选实施例而已,并不用于限制本申请,对于本领域的技术人员来说,本申请可以有各种更改和变化。凡在本申请的精神和原则之内,所作的任何修改、等同替换、改进等,均应包含在本申请的保护范围之内。The above descriptions are only preferred embodiments of the present application, and are not intended to limit the present application. For those skilled in the art, the present application may have various modifications and changes. Any modification, equivalent replacement, improvement, etc. made within the spirit and principle of this application shall be included within the protection scope of this application.
Claims (10)
Priority Applications (1)
| Application Number | Priority Date | Filing Date | Title |
|---|---|---|---|
| CN202111633573.4A CN114327800A (en) | 2021-12-28 | 2021-12-28 | Method and device for writing in topic, processor and stream processing platform |
Applications Claiming Priority (1)
| Application Number | Priority Date | Filing Date | Title |
|---|---|---|---|
| CN202111633573.4A CN114327800A (en) | 2021-12-28 | 2021-12-28 | Method and device for writing in topic, processor and stream processing platform |
Publications (1)
| Publication Number | Publication Date |
|---|---|
| CN114327800A true CN114327800A (en) | 2022-04-12 |
Family
ID=81014962
Family Applications (1)
| Application Number | Title | Priority Date | Filing Date |
|---|---|---|---|
| CN202111633573.4A Pending CN114327800A (en) | 2021-12-28 | 2021-12-28 | Method and device for writing in topic, processor and stream processing platform |
Country Status (1)
| Country | Link |
|---|---|
| CN (1) | CN114327800A (en) |
Citations (4)
| Publication number | Priority date | Publication date | Assignee | Title |
|---|---|---|---|---|
| CN105608223A (en) * | 2016-01-12 | 2016-05-25 | 北京中交兴路车联网科技有限公司 | Hbase database entering method and system for kafka |
| CN107220892A (en) * | 2017-05-27 | 2017-09-29 | 国家计算机网络与信息安全管理中心 | One kind melts data intelligence pretreating tool and method applied to magnanimity P2P net monetary allowances |
| CN110059115A (en) * | 2019-03-19 | 2019-07-26 | 阿里巴巴集团控股有限公司 | A kind of method for reading data and device |
| CN113590667A (en) * | 2021-05-31 | 2021-11-02 | 深圳感臻科技有限公司 | Real-time data updating and managing method based on Spark Streaming |
-
2021
- 2021-12-28 CN CN202111633573.4A patent/CN114327800A/en active Pending
Patent Citations (4)
| Publication number | Priority date | Publication date | Assignee | Title |
|---|---|---|---|---|
| CN105608223A (en) * | 2016-01-12 | 2016-05-25 | 北京中交兴路车联网科技有限公司 | Hbase database entering method and system for kafka |
| CN107220892A (en) * | 2017-05-27 | 2017-09-29 | 国家计算机网络与信息安全管理中心 | One kind melts data intelligence pretreating tool and method applied to magnanimity P2P net monetary allowances |
| CN110059115A (en) * | 2019-03-19 | 2019-07-26 | 阿里巴巴集团控股有限公司 | A kind of method for reading data and device |
| CN113590667A (en) * | 2021-05-31 | 2021-11-02 | 深圳感臻科技有限公司 | Real-time data updating and managing method based on Spark Streaming |
Similar Documents
| Publication | Publication Date | Title |
|---|---|---|
| US11467755B2 (en) | Method and system for enabling agentless backup and restore operations on a container orchestration platform | |
| US7349970B2 (en) | Workload management of stateful program entities | |
| US5920863A (en) | System and method for supporting transactions for a thin client lacking a persistent store in a distributed object-oriented environment | |
| CN114925084A (en) | Distributed transaction processing method, system, device and readable storage medium | |
| US11379828B2 (en) | Distributed computing and storage network implementing high integrity, high bandwidth, low latency, secure processing | |
| CN108737325A (en) | A kind of multi-tenant data partition method, apparatus and system | |
| CN106569896B (en) | A data distribution and parallel processing method and system | |
| WO2020211483A1 (en) | Method and apparatus for storing and executing smart contract in blockchain, and electronic device | |
| CN110009497B (en) | Block chain-based decision method and device and electronic equipment | |
| CN113888173B (en) | Blockchain-based smart contract calling method, device and device | |
| US20170085653A1 (en) | Method, device and system for message distribution | |
| CN112099973B (en) | Service calling method and device | |
| US20220374296A1 (en) | Runtime mapping of asynchronous application programming interface messaging topics and schemas | |
| CN117785489A (en) | Server, task execution method and device and storage medium | |
| CN112930530A (en) | Client application for network application execution | |
| CN109614242B (en) | A computing power sharing method, device, equipment and medium | |
| CN110275767A (en) | A batch data processing method and device | |
| CN114328434A (en) | Data processing system, method, apparatus and storage medium | |
| US10587725B2 (en) | Enabling a traditional language platform to participate in a Java enterprise computing environment | |
| CN114327800A (en) | Method and device for writing in topic, processor and stream processing platform | |
| CN114896258B (en) | Transaction data synchronization method and device, computer equipment and storage medium | |
| US11288004B1 (en) | Consensus-based authority selection in replicated network-accessible block storage devices | |
| CN115202907A (en) | Application program interface operation method, system, computer equipment and medium | |
| CN114546644A (en) | Cluster resource scheduling method, device, software program, electronic device and storage medium | |
| CN114490690A (en) | Transaction management method and device, electronic equipment and storage medium |
Legal Events
| Date | Code | Title | Description |
|---|---|---|---|
| PB01 | Publication | ||
| PB01 | Publication | ||
| SE01 | Entry into force of request for substantive examination | ||
| SE01 | Entry into force of request for substantive examination |