kafka生产者

news/发布时间2024/5/18 13:39:22

1.原理

2.普通异步发送

引入pom:

    <dependencies><dependency><groupId>org.apache.kafka</groupId><artifactId>kafka-clients</artifactId><version>3.0.0</version></dependency><dependency><groupId>org.slf4j</groupId><artifactId>slf4j-log4j12</artifactId><version>1.7.25</version></dependency></dependencies>
package com.atguigu.kafka.producer;import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.StringSerializer;import java.util.Properties;public class CustomProducer {public static void main(String[] args) {// 1. 创建 kafka 生产者的配置对象Properties properties = new Properties();// 2. 给 kafka 配置对象添加配置信息:bootstrap.serversproperties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"hadoop100:9092");// key,value 序列化(必须):key.serializer,value.serializerproperties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringSerializer");properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringSerializer");// 3. 创建 kafka 生产者对象KafkaProducer<String, String> kafkaProducer = newKafkaProducer<String, String>(properties);// 4. 调用 send 方法,发送消息for (int i = 0; i < 5; i++) {kafkaProducer.send(newProducerRecord<>("first","atguigu " + i));}// 5. 关闭资源kafkaProducer.close();}
}

测试效果:

3.带回调的异步发送

回调的信息实际是从队列返回的

 

4.同步发送 

只需在异步发送的基础上,再调用一下 get()方法即可。

5.分区

6.分区策略 

指定key的值:对key的hashcode做分配

希望将订单表的数据全部发到kafka的一个分区上,怎么处理?

将该表的名称作为key值然后发送即可 
如:

7.自定义分区 (脏数据的处理)

如果研发人员可以根据企业需求,自己重新实现分区器。 1)需求 例如我们实现一个分区器实现,发送过来的数据中如果包含 atguigu,就发往 0 号分区, 不包含 atguigu,就发往 1 号分区。 

自定义分区器:

package com.atguigu.kafka.producer;import org.apache.kafka.clients.producer.Partitioner;
import org.apache.kafka.common.Cluster;import java.util.Map;public class MyPartitioner implements Partitioner {@Overridepublic int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {String s = value.toString();int partion;if(s.contains("atguigu")) {partion = 1;}else {partion=0;}return partion;}@Overridepublic void close() {}@Overridepublic void configure(Map<String, ?> map) {}
}

 拷贝全类名,产生关联

测试结论:

 

 

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.bcls.cn/ZVVE/2830.shtml

如若内容造成侵权/违法违规/事实不符,请联系编程老四网进行投诉反馈email:xxxxxxxx@qq.com,一经查实,立即删除!

相关文章

恒峰智慧-智能高压森林应急消防泵:森林灭火的得力助手

在大自然中&#xff0c;森林是生态系统中的重要组成部分。它们不仅为生物多样性提供了宝贵的栖息地&#xff0c;还通过吸收二氧化碳和其他温室气体&#xff0c;帮助调节地球的气候。然而&#xff0c;当森林火灾发生时&#xff0c;这些宝贵的资源可能会在瞬间化为灰烬。因此&…

开源软件的利弊

目录 开源软件 优势 免费 透明 可更改 可协作 影响力 坏处 安全隐患 良莠不齐 学习成本 持续性问题 未知风险 开源软件 开源软件是一种基于开放协作和共享的软件开发模式&#xff0c;其利弊对于软件产业和社会发展具有重要意义 优势 免费 谁能拒绝不要钱的东西…

企业安全建设工具推荐

全自动化挖洞&#xff0c;助力企业安全建设&#xff0c;一键实现域名扫描、IP 发现、端口扫描、服务识别、网站识别、漏洞探测、分析发现、合规检查。 使用方式&#xff1a; 录入目标企业名称即可开始使用 技术细节&#xff1a; 第一步&#xff1a;通过企业主体关联企业备案…

超级实用的python代码片段汇总和详细解析(16个)

目录 1. 生成随机文本 2. 计算文本文件中的字数 3. 替换文件文件中的字串 4. 多文件名的批量替换 5. 从网站提取数据 6. 批量下载图片 7.批量删除空文件夹 8.Excel表格读写 9.合并Excel表格工作簿 10.数据库SQL查询 11. 系统进程查杀 12.图像尺寸调整和裁剪 13.图…

cocos creator3.x项目打包成aar 加入到已有的Android工程

Cocos crearor版本&#xff1a; 3.4.2 Android Studio Flamingo | 2022.2.1 Patch 2 1、配置构建安卓项目 2、 运行编译无报错 出现问题可尝试修改Gradle版本 修改jdk版本 3、对libservice打包成aar 打包完后 再build/outputs找到aar 如果看不到Tasks模块&#xff0c;在Fil…

2.网络游戏逆向分析与漏洞攻防-游戏启动流程漏洞-项目搭建

内容参考于&#xff1a;易道云信息技术研究院VIP课 上一个内容&#xff1a;1.网络游戏逆向分析与漏洞攻防-游戏启动流程漏洞-测试需求与需求拆解-CSDN博客 代码以提交到码云&#xff1a;https://gitee.com/dye_your_fingers/titan 开始搭建环境 创建之前&#xff1a;HOOK引擎…

【js】无限虚拟列表的原理及实现

什么是虚拟列表 虚拟列表是长列表按需显示思路的一种实现&#xff0c;即虚拟列表是一种根据滚动容器元素的可视区域来渲染长列表数据中某一个部分数据的技术。 简而言之&#xff0c;虚拟列表指的就是「可视区域渲染」的列表。有三个概念需要了解一下&#xff1a; 视口容器元…

Elasticsearch从入门到精通-01认识Elasticsearch

Elasticsearch从入门到精通-01认识Elasticsearch &#x1f44f;作者简介&#xff1a;大家好&#xff0c;我是程序员行走的鱼 &#x1f342;博主从本篇正式开始ES学习&#xff0c;希望小伙伴可以一起探讨 &#x1f4d6; 本篇主要介绍和大家一块简单认识下ES并了解ES中的主要角色…

华清远见作业第三十九天——Qt(第一天)

思维导图&#xff1a; 登录界面&#xff1a; 代码&#xff1a; #include "mainwindow.h" #include<QToolBar> #include<QPushButton> MainWindow::MainWindow(QWidget *parent): QMainWindow(parent) {this->resize(600,400);this->setFixedSize…

用于将Grafana默认数据库sqlite3迁移到MySQL数据库

以下是一个方案&#xff0c;用于将Grafana数据迁移到MySQL数据库。 背景: grafana 默认采用的是sqlite3&#xff0c;当我们要以集群形式部署的时使用mysql较为方便&#xff0c;试了很多sqlite转mysql的方法要么收费,最后放弃。选择自己动手风衣足食。 目标: 迁移sqlite3切换…

11.CSS3的媒介(media)查询

CSS3 的媒介(media)查询 经典真题 如何使用媒体查询实现视口宽度大于 320px 小于 640px 时 div 元素宽度变成 30% 媒体查询 媒体查询英文全称 Media Query&#xff0c;顾名思义就是会查询用户所使用的媒体或者媒介。 在现在&#xff0c;网页的浏览终端是越来越多了。用户可…

ChatGPT:开启智能新纪元的里程碑

在人工智能领域&#xff0c;每一次技术的飞跃都预示着一次新的革命。而ChatGPT的出现&#xff0c;无疑是这场革命中的一道亮丽风景线。它不仅代表了当前人工智能语言处理能力的巅峰&#xff0c;更标志着一个全新智能时代的开端。那么&#xff0c;我们是否可以将其誉为“未来人工…

C++ bfs建模(六十一)【第八篇】

今天我们来学习一下bfs建模 1.bfs建模 BFS 建模 广度优先搜索是从某个起点由内向外逐层搜索&#xff08;多起点 BFS 中所有起点拥有同样的搜索特性&#xff09;。 搜索顺序为&#xff1a;1→2→3→4→5→6→7→8。 之前的课程中&#xff0c;我们学习过搜索树中的每个结点均表…

C语言第二十七弹---内存函数

✨个人主页&#xff1a; 熬夜学编程的小林 &#x1f497;系列专栏&#xff1a; 【C语言详解】 【数据结构详解】 内存函数 1、memcpy 使用和模拟实现 2、memmove 使用和模拟实现 3、memset 函数的使用 4、memcmp 函数的使用 总结 前面两弹讲解了字符函数和字符串函数&…

http前生今世

HTTP/0.9&#xff0c;仅支持GET方法&#xff0c;并且响应中没有HTTP头信息&#xff0c;只有文档内容。 HTTP/1.0增加了对POST方法、状态码、HTTP头信息等的支持&#xff0c;这一版本也是广泛应用的历史性版本。 HTTP/1.1引入了持久连接&#xff08;Persistent Connections&…

时间函数,SQL获取,实际案例,简单易懂

时间函数&#xff1a; TODAY(): 返回当前日期 DATE():返回表示特定日期的连续序列号 Time&#xff1a;返回指定时间的序列号。 Time("时","分","秒") EOMONTH():以日期/时间格式返回指定月份数之前或之后的月份的最后一天的日期 EOMONTH(…

SparkSQL学习02-编程入口

文章目录 1 DataFrame的构建方式方式一&#xff1a;JavaBean反射的方式1.1 创建Scala类1.2 创建Scala对象 方式二&#xff1a;动态编码的方式 2 DataSet的构建方式3 RDD和DataFrame以及DataSet之间的相互转换3.1【RDD-->DataFrame】和【RDD-->DataSet】3.2【DataFrame--&…

Matlab/simulink光伏发电的恒定电压法MPPT仿真(持续更新)

1.光伏发电的电导增量法MPPT仿真 2.光伏发电的恒定电压法MPPT仿真 3.光伏发电的扰动观察法MPPT仿真 4.光伏发电的占空比法MPPT仿真 5.基于神经网络的MPPT光伏发电仿真 6. 基于模糊控制的MPPT光伏发电仿真 7. 基于粒子群算法&#xff08;PSO&#xff09;的500w光伏系统MPPT控…

【2024.02.23】定时执行专家V7.0最新版GUI界面 - 基于wxWidgets 3.2.4,开发工具CodeBlocks + GCC9.2.0

目录 ◆ V7.0 更新日志 ◆ 主窗口 ◆ 关于对话框 ◆ 任务对话框 ◆ 触发器对话框 ◆ 设置对话框 ◆ 选择语言对话框 ◆ V7.0 更新日志 定时执行专家V7.0 龙年春节版 ★★★★★★★★★★★★★★★★★★★★ ▼2024-02-22 V7.0 - 更新日志▼ - 增加 执行记录功能&am…

一文带你了解如何在Java中操作Redis

文章目录 前言发现宝藏一、 Redis客户端简介1. Redis客户端分类2. Spring 整合 Redis 的两种方式 二、 使用 Jedis 操作 Redis1. Jedis的maven坐标2. 使用Jedis操作Redis的步骤3. Jedis 操作 Redis 示例 三、 使用 Spring Data Redis 操作 Redis1. Spring Data Redis 的 maven …
推荐文章