《大數(shù)據(jù)技術(shù)》課件(共九章-上)_第1頁
《大數(shù)據(jù)技術(shù)》課件(共九章-上)_第2頁
《大數(shù)據(jù)技術(shù)》課件(共九章-上)_第3頁
《大數(shù)據(jù)技術(shù)》課件(共九章-上)_第4頁
《大數(shù)據(jù)技術(shù)》課件(共九章-上)_第5頁
已閱讀5頁,還剩162頁未讀, 繼續(xù)免費閱讀

下載本文檔

版權(quán)說明:本文檔由用戶提供并上傳,收益歸屬內(nèi)容提供方,若內(nèi)容存在侵權(quán),請進行舉報或認領(lǐng)

文檔簡介

1.大數(shù)據(jù)技術(shù)概述牛津大學教授維克托·邁爾-舍恩伯格(ViktorMayer-Sch?nberger)指出,大數(shù)據(jù)帶來的信息風暴正在改變我們的生活、工作和思維。理解大數(shù)據(jù)并對這些數(shù)據(jù)進行有效的處理和分析是企業(yè)和政府的機遇,更是一種挑戰(zhàn)。數(shù)據(jù)流的處理必須滿足高吞吐和低延遲的特性,ApacheFlink(以下簡稱Flink)是一種針對數(shù)據(jù)流的大數(shù)據(jù)處理框架。讀完本章之后,讀者可以了解以下內(nèi)容。大數(shù)據(jù)的特點、大數(shù)據(jù)分而治之的處理思想。批處理和流處理的區(qū)別。流處理的基礎概念。流處理框架的技術(shù)更迭和架構(gòu)演進。Flink開發(fā)的常用編程語言。什么是大數(shù)據(jù)從批處理到流處理代表性大數(shù)據(jù)技術(shù)從Lambda到Kappa流式處理基礎概念編程語言的選擇大數(shù)據(jù)的5個VVolume:數(shù)據(jù)量大Velocity:數(shù)據(jù)產(chǎn)生速度快Variety:數(shù)據(jù)類型繁多Veracity:數(shù)據(jù)真實性Value:數(shù)據(jù)價值大數(shù)據(jù)單臺計算機無法處理所有數(shù)據(jù),使用多臺計算機組成集群,進行分布式計算。分而治之:將原始問題分解為多個子問題多個子問題分別在多臺計算機上求解將子結(jié)果匯總比較經(jīng)典的模式和框架:MPIMapReduce大數(shù)據(jù)分而治之MPI:MessagePassingInterface消息傳遞接口使用分治法將問題分解成子問題,在不同節(jié)點上分而治之地求解。MPI提供數(shù)據(jù)發(fā)送和數(shù)據(jù)接收操作:將本進程中某些數(shù)據(jù)發(fā)送給其他進程接收其他進程的數(shù)據(jù)自行設計分治算法,將復雜問題分解為子問題優(yōu)勢:以很細的粒度控制數(shù)據(jù)的通信劣勢:難度大,開發(fā)調(diào)試時間成本高MPI程序員只需要定義兩個操作:Map和Reduce案例:三明治制作Map階段將原材料在不同的節(jié)點上分別進行處理Shuffle/Group階段將不同的中間食材進行組合Reduce階段最終將一組中間食材組合成三明治成品學習門檻比MPI低MapReduce什么是大數(shù)據(jù)從批處理到流處理代表性大數(shù)據(jù)技術(shù)從Lambda到Kappa流式處理基礎概念編程語言的選擇單條數(shù)據(jù)被稱為事件(Event)或者被稱為一條數(shù)據(jù)或一個元素。事件按照時序排列會形成一個數(shù)據(jù)流(Data

Stream)。數(shù)據(jù)流一般是無界(Unbounded)的,某段有界數(shù)據(jù)流(BoundedDataStream)可以組成一個數(shù)據(jù)集。數(shù)據(jù)與數(shù)據(jù)流批處理(BatchProcessing):對一批數(shù)據(jù)進行處理案例:微信運動統(tǒng)計步數(shù),銀行信用卡賬單統(tǒng)計…

數(shù)據(jù)總量大,計算非常耗時流處理數(shù)據(jù)本質(zhì)上是流,流處理(StreamProcessing)對數(shù)據(jù)流進行處理案例:查看電商實時銷售業(yè)績、股票交易…批處理與流處理流處理一般使用生產(chǎn)者-消費者模型股票交易案例:輔助人工決策實現(xiàn)消費者側(cè)代碼,以10秒為一個時間窗口,統(tǒng)計窗口內(nèi)的交易情況可擴展性:隨著數(shù)據(jù)不斷增多,能否保證我們的程序能夠快速擴展到更多的節(jié)點上。數(shù)據(jù)傾斜:數(shù)據(jù)沒有均勻分布到分布式系統(tǒng)各個節(jié)點上。容錯性:系統(tǒng)崩潰重啟后,之前的那些計算如何恢復。時序錯亂:數(shù)據(jù)到達的時間和實際發(fā)生的時間是不一致的,有一定的延遲,需要設計等待策略。Flink:為流處理而生。流處理框架必要性生產(chǎn)者-消費者模型什么是大數(shù)據(jù)從批處理到流處理代表性大數(shù)據(jù)技術(shù)從Lambda到Kappa流式處理基礎概念編程語言的選擇MapReduce編程模型的一種實現(xiàn),逐漸形成了一整套生態(tài)圈。主要組件:HadoopMapReduce:數(shù)據(jù)處理模型,面向批處理。HDFS:分布式文件系統(tǒng),提供存儲支持。YARN:資源調(diào)度器,分配計算資源。其他著名組件:Hive:SQL-on-HadoopHbase:基于HDFS的分布式數(shù)據(jù)庫,毫秒級實時查詢Kafka:消息隊列ZooKeeper:分布式環(huán)境的協(xié)調(diào)HadoopHadoop生態(tài)圈Spark初衷:改良HadoopMapReduce的編程模型,提高運行速度,優(yōu)化機器學習性能。易用性:比MapReduce更好用,提供了多種編程語言API,支持SQL、機器學習和圖計算。速度快:盡量將計算放在內(nèi)存中。完美融入進Hadoop生態(tài)圈。流處理:SparkStreaming,mini-batch思想,將輸入數(shù)據(jù)流拆分成多個批次。Spark是一個批流一體的計算框架。SparkSpark生態(tài)圈Spark

mini-batch流處理消息隊列:數(shù)據(jù)集成和系統(tǒng)解耦,某個應用系統(tǒng)專注于一個目標。企業(yè)將各個子系統(tǒng)獨立出來,子系統(tǒng)之間通過消息隊列來發(fā)送數(shù)據(jù)。Kafka

Kafka可以連接多個組件和系統(tǒng)主要面向流處理流處理框架經(jīng)歷了三代演進StormSpark

StreamingFlink事件投遞保障:Exactly-Once:一條數(shù)據(jù)只影響一次最終結(jié)果毫秒級的延遲Flink什么是大數(shù)據(jù)從批處理到流處理代表性大數(shù)據(jù)技術(shù)從Lambda到Kappa流式處理基礎概念編程語言的選擇Lambda架構(gòu):批處理層、流處理層、在線服務層批處理層:等待一個批次數(shù)據(jù),使用批處理框架計算,得到一個非實時的結(jié)果。比如,凌晨0點開始統(tǒng)計前一天所有商品的計算次數(shù),計算需要幾個小時。流處理層:使用流處理框架生成結(jié)果。早期的流處理框架不成熟,結(jié)果近似準確。在線服務層:將來自批處理層準確但有延遲的預處理結(jié)果和流處理層實時但不夠準確的預處理結(jié)果做融合。程序員需要維護批處理和流處理兩套業(yè)務邏輯。Lambda架構(gòu)Lambda架構(gòu)Kafka等消息隊列可以保存更長時間的歷史數(shù)據(jù),它不僅起到消息隊列的作用,也可以存儲數(shù)據(jù),替代數(shù)據(jù)倉庫。Flink流處理框架解決了事件亂序下計算結(jié)果的準確性問題。程序員只維護一套流處理層,維護成本低。Kappa架構(gòu)Kappa架構(gòu)什么是大數(shù)據(jù)從批處理到流處理代表性大數(shù)據(jù)技術(shù)從Lambda到Kappa流式處理基礎概念編程語言的選擇延遲:一個事件被系統(tǒng)處理的總時間。案例:自助食堂,一位用餐者從進入食堂到離開食堂的總耗時。高峰時,總耗時會增加。分位延遲更能反映系統(tǒng)的性能。吞吐:系統(tǒng)最大能處理多少事件。與系統(tǒng)本身設計有關(guān),也與數(shù)據(jù)源的數(shù)據(jù)量有關(guān)。延遲與吞吐相互影響,一起反映了系統(tǒng)的性能。優(yōu)化方式:優(yōu)化單節(jié)點內(nèi)的計算速度,使用并行策略,分而治之地處理數(shù)據(jù)。延遲和吞吐延遲和吞吐更直觀的表現(xiàn):用戶是否排隊。滾動窗口(TumblingWindow):定義一個固定的窗口長度,長度是一個時間間隔。滑動窗口(SlidingWindow):定義一個固定的窗口長度和一個滑動長度。會話窗口(SessionWindow):窗口長度不固定,根據(jù)會話間隔(SessionGap)確定窗口,兩個事件之間的間隔大于SessionGap,則兩個事件被劃分到不同的窗口中。窗口三種時間窗口EventTime:事件實際發(fā)生的時間事件發(fā)生時,EventTime就已經(jīng)確定ProcessingTime:事件被流處理框架處理的時間不同節(jié)點、系統(tǒng)內(nèi)不同模塊、同一數(shù)據(jù)不同次處理都會產(chǎn)生不同的ProcessingTime案例:手機游戲,用戶需要與服務器實時交互,游戲根據(jù)實時數(shù)據(jù)計分。信號丟失,部分數(shù)據(jù)上傳有延遲,使用事件的Event

Time更準確。時間Watermark是插入到數(shù)據(jù)流的元素。Watermark元素到達,假設不會有比這個時間點更晚的上報數(shù)據(jù)。可以設置不同的Watermark策略,是一種折中方案:Watermark等待時間短,保證低延遲,數(shù)據(jù)準確性下降。Watermark等待時間長,數(shù)據(jù)更準確,延遲高,維護難度大。Watermark無狀態(tài):流處理中,不需要額外信息,給定一個輸入數(shù)據(jù),直接得到輸出。將英文單詞轉(zhuǎn)化為小寫。有狀態(tài):根據(jù)歷史信息,處理新流入數(shù)據(jù)。統(tǒng)計一分鐘內(nèi)單詞出現(xiàn)次數(shù),需要保存已經(jīng)進入系統(tǒng)的歷史。使用檢查點(Checkpoint)技術(shù),將狀態(tài)數(shù)據(jù)保存下來,用于故障后的恢復。狀態(tài)與檢查點有狀態(tài)計算和無狀態(tài)計算如果發(fā)生故障,數(shù)據(jù)是否被成功處理?At-Most-Once:每個事件最多被處理一次。有些數(shù)據(jù)被丟棄,最不安全。At-Least-Once:每個事件至少被處理一次,有些事件可能被處理多次。部分數(shù)據(jù)被處理多次,可能不準確。Exactly-Once:每個事件只被處理一次。事件不丟不重。實現(xiàn)難度最大。數(shù)據(jù)一致性保障什么是大數(shù)據(jù)從批處理到流處理代表性大數(shù)據(jù)技術(shù)從Lambda到Kappa流式處理基礎概念編程語言的選擇Java企業(yè)級編程語言有很多開源包大數(shù)據(jù)必備Scala函數(shù)式編程有一定學習門檻Flink目前絕大多數(shù)代碼和功能均由Java實現(xiàn)編程語言的選擇Python簡單易用PyFlinkSQL上手門檻很低大數(shù)據(jù)數(shù)據(jù)量大、產(chǎn)生速度快、類型多,為了獲取數(shù)據(jù)背后價值還需要注意數(shù)據(jù)的真實性。業(yè)界普遍基于分治法,使用分布式系統(tǒng)以應對各類技術(shù)挑戰(zhàn)。大數(shù)據(jù)生態(tài)圈歷經(jīng)十幾年發(fā)展,已經(jīng)日漸完善,不同技術(shù)分別面向存儲、計算和在線服務等不同的需求。隨著業(yè)界對數(shù)據(jù)實時性要求越來越高,數(shù)據(jù)處理正在由批處理向流處理發(fā)展。Flink提供了高吞吐、低延遲的性能,有效解決了狀態(tài)管理和故障恢復等流處理領(lǐng)域非常棘手的問題,并且Flink也提供批處理能力,是一款流處理與批處理一體的大數(shù)據(jù)處理引擎。2.大數(shù)據(jù)必備編程知識本章先回顧和復習一下必備的編程知識,了解這些編程知識有助于我們快速讀懂各類源碼,深刻理解FlinkAPI及其背后的原理。本書主要基于Java的相關(guān)知識,也會在必要的地方兼顧Scala的相關(guān)知識。本章所涉及的主要內(nèi)容。熟悉繼承和多態(tài)。了解泛型的使用和特點。了解函數(shù)式編程。繼承和多態(tài)泛型函數(shù)式編程案例:動物類(Animal)和魚類(Fish)繼承關(guān)系保證所有動物子類都具有動物類的屬性和方法子類有自己的屬性和方法。除了動物,還有很多其他事物也會移動,使用接口(interface)來抽象“移動”。繼承Java的繼承:繼承類extends

實現(xiàn)接口implements繼承publicclassClassAimplementsMove{@Overridepublicvoidmove(){...}}實現(xiàn)接口publicclassDogextendsAnimal

{

privateStringdogData;

publicDog(StringmyName,StringmyDescription,StringmyDogData){=myName;this.description=myDescription;this.dogData=myDogData}

}

繼承類interfaceclass重寫:子類和父類都定義同名方法,子類的方法會覆蓋父類中已有的方法。重載:多個同名方法,這些方法名字相同、參數(shù)不同、返回類型不同。重寫與重載publicclassClassAimplementsMove{@Overridepublicvoidmove(){...}}@Override

:在子類中重寫父類中的同名方法publicclassOverloading{

//無參數(shù),返回值為int類型publicinttest(){System.out.println("test");return1;}

//有一個參數(shù)publicvoidtest(inta){System.out.println("test"+a);}

//有兩個參數(shù)和一個返回值publicStringtest(inta,Strings){System.out.println("test"+a+""+s);returna+""+s;}}同名方法重載:一個類中多個方法都名為test,但是參數(shù)類型和返回值類型不同。繼承和多態(tài)泛型函數(shù)式編程案例:Java中的List和ArrayListArrayList是一個泛型類,List是一個泛型接口ArrayList泛型是一種集合容器,可以向這個集合容器中添加String、Double以及其他各類數(shù)據(jù)類型。沒必要創(chuàng)建StringArrayList、DoubleArrayList等類。泛型List<String>strList=newArrayList<String>();List<Double>doubleList=newLinkedList<Double>();類名后面加上<T>

類內(nèi)部的一些屬性和方法都可以使用泛型T泛型規(guī)范:T代表一般的任何類。E代表元素(Element)或異常(Exception)。 K或KEY代表鍵(Key)。 V代表值(Value),通常與K一起配合使用。Java泛型類publicclassMyArrayList<T>{

privateintsize;

T[]elements;

publicMyArrayList(intcapacity){

this.size=capacity;this.elements=(T[])newObject[capacity];}

publicvoidset(Telement,intposition){elements[position]=element;}

@OverridepublicStringtoString(){Stringresult="";for(inti=0;i<size;i++){result+=elements[i].toString();}returnresult;}

}

與泛型類類似,使用<>符號可以繼承并實現(xiàn)這個接口Java泛型接口publicinterfaceList<E>{...publicList<E>subList(intfromIndex,inttoIndex);}

publicclassArrayList<E>implementsList<E>{...publicList<E>subList(intfromIndex,inttoIndex){...//返回一個List<E>類型值}}

要實現(xiàn)的子類是泛型的

publicclassDoubleListimplementsList<Double>{...publicList<Double>subList(intfromIndex,inttoIndex){...//返回一個List<Double>類型值}}要實現(xiàn)的子類不是泛型的,而是有確定類型的

泛型方法可以存在于泛型類中,也可以存在于普通的類中。泛型方法的類型E和泛型類中的類型T可以不一樣。泛型方法是泛型類的一個成員,泛型方法既可以繼續(xù)使用類的類型T,也可以自己定義新的類型E。Java泛型方法publicclassMyArrayList<T>{...//public關(guān)鍵字后的<E>表明該方法是一個泛型方法//泛型方法中的類型E和泛型類中的類型T可以不一樣

public<E>EprocessElement(Eelement){...returnE;}}

Java泛型信息只存在于代碼編譯階段,當程序運行到JVM上時,與泛型相關(guān)的信息會被擦除。對于絕大多數(shù)應用系統(tǒng)開發(fā)者來說影響不太大,對于框架開發(fā)者來說,必須要注意。

類型擦除Class<?>strListClass=newArrayList<String>().getClass();Class<?>intListClass=newArrayList<Integer>().getClass();//輸出:classjava.util.ArrayListSystem.out.println(strListClass);//輸出:classjava.util.ArrayListSystem.out.println(intListClass);//輸出:trueSystem.out.println(strListClass.equals(intListClass));泛型擦除:無法區(qū)別strListClass和intListClass這兩個類型繼承和多態(tài)泛型函數(shù)式編程適合進行并行計算的一種編程范式非函數(shù)式編程:創(chuàng)建中間變量,分步執(zhí)行函數(shù)式編程

:與數(shù)學表達式更相似實現(xiàn)單個函數(shù),將零到多個輸入轉(zhuǎn)換成零到多個輸出。比如,add()

將兩個輸入轉(zhuǎn)化為一個輸出。將多個函數(shù)連接起來,實現(xiàn)所需業(yè)務邏輯。比如,將add()、multiply()連接到一起。函數(shù)式編程

addResult=x+yresult=addResult*z非函數(shù)式編程result=add(x,y).multiply(z)函數(shù)式編程Lambda表達式被一些編程語言用來實現(xiàn)函數(shù)式編程。一個箭頭符號

->

,兩邊連接著輸入?yún)?shù)和函數(shù)體。Lambda表達式(parameters)->{body}Java的Lambda表達式的語法規(guī)則

//接收2個int類型參數(shù),返回它們的和(intx,inty)->x+y//接收1個String類型參數(shù),將其輸出到控制臺,不返回任何值(Strings)->{System.out.print(s);}

//參數(shù)為圓半徑,返回圓面積,返回值為double類型(doubler)->{doublepi=3.1415;returnr*r*pi;}幾個Java

Lambda表達式案例輸入?yún)?shù):接收零到多個輸入?yún)?shù)程序員可以提供輸入類型,也可以不提供類型,讓代碼根據(jù)上下文去推斷參數(shù)可以放在圓括號()中,多個參數(shù)通過英文逗號,隔開函數(shù)體:可以有一到多行語句函數(shù)體有多行內(nèi)容,必須使用花括號{}

輸出的類型與所需要的類型相匹配Java

Lambda表達式Lambda表達式本質(zhì)是一種接口,它要實現(xiàn)一個函數(shù)式接口(FunctionalInterface)中的虛方法函數(shù)式接口是一種接口,并且它只有一個虛方法。@FunctionalInterface

注解函數(shù)式接口@FunctionalInterfaceinterfaceAddInterface<T>{Tadd(Ta,Tb);}

publicstaticclassMyAddimplementsAddInterface<Double>{@OverridepublicDoubleadd(Doublea,Doubleb){returna+b;}}如果沒有Lambda表達式(Integera,Integerb)->a+b;

使用Lambda表達式Java

8之后推出的,專注于對集合(Collection)對象的操作。右側(cè)案例:數(shù)據(jù)先經(jīng)過stream()方法被轉(zhuǎn)換為一個Stream類型,后經(jīng)過filter()、map()、collect()等處理邏輯,生成我們所需的輸出。各個操作之間使用英文點號.來連接,這種方式被稱作鏈式調(diào)用(MethodChaining)。鏈式調(diào)用:將多個函數(shù)連接起來。Flink的API是面向數(shù)據(jù)集或數(shù)據(jù)流的操作。這些操作分布在大數(shù)據(jù)集群的多個節(jié)點上,并行地分布式執(zhí)行。Java

Stream

APIList<String>strings=Arrays.asList("abc","","bc","12345","efg","abcd","","jkl");

List<Integer>lengths=strings.stream().filter(string->!string.isEmpty()).map(s->s.length()).collect(Collectors.toList());

lengths.forEach((s)->System.out.println(s));本章中,我們回顧了Flink開發(fā)經(jīng)常用到的繼承和多態(tài)、泛型和函數(shù)式編程等概念。在之后的編程學習中可以利用這些內(nèi)容更好的實現(xiàn)程序業(yè)務邏輯。3.Flink的設計與運行原理本章將以WordCount程序為主線,介紹Flink的設計與運行原理,主要內(nèi)容有:Flink的數(shù)據(jù)流圖,F(xiàn)link分布式架構(gòu)與核心組件,任務執(zhí)行與資源劃分。

本章所涉及的主要內(nèi)容。熟悉Flink的數(shù)據(jù)流圖。掌握Flink分布式架構(gòu)與核心組件。掌握任務執(zhí)行與資源劃分。數(shù)據(jù)流圖Flink架構(gòu)和核心組件任務執(zhí)行與資源劃分

相關(guān)概念方法(Method):Java或Scala語言中的方法,有輸入?yún)?shù)和返回值。函數(shù)(Function):Flink提供給開發(fā)者的接口flatMap()、keyBy()等算子(Operator):在執(zhí)行層面,算子對數(shù)據(jù)進行操作,一般一到多個函數(shù)對應一個算子。Source、Transformation和Sink

數(shù)據(jù)流圖從代碼到邏輯視圖邏輯視圖中圓圈表示算子,箭頭表示數(shù)據(jù)流可以在Flink

Web

UI中查看一個作業(yè)的邏輯視圖大數(shù)據(jù)框架的算子對計算做了抽象,方便用戶進行并行計算、橫向擴展和故障恢復邏輯視圖

分布式環(huán)境下并行化物理執(zhí)行數(shù)據(jù)流被切分到多個分區(qū)(Partition)算子被切分為算子子任務(Operator

Subtask),又被稱為算子實例物理執(zhí)行的基本單元并行度(Parallelism):衡量并行切分的多少物理執(zhí)行數(shù)據(jù)在不同算子子任務之間數(shù)據(jù)交換常見四種數(shù)據(jù)交換策略:前向傳播(Forward)按Key分組(Keyed-Based)廣播(Broadcast)隨機(Random)數(shù)據(jù)交換策略數(shù)據(jù)流圖Flink架構(gòu)和核心組件任務執(zhí)行與資源劃分Master協(xié)調(diào)管理DispatcherResourceManagerJobManagerTaskManager擁有CPU、內(nèi)存等計算資源Flink作業(yè)被分發(fā)到多個TaskManager上并行執(zhí)行主從架構(gòu)啟動一個Flink集群,TaskManager進程啟動后會將自己注冊給Master的ResourceManagerClient提交作業(yè)(Application)Master的Dispatcher接收作業(yè),啟動JobManagerJobManager向ResourceManager申請資源,ResourceManager會將閑置資源分配給JobManager作業(yè)轉(zhuǎn)化為物理執(zhí)行圖,計算任務分發(fā)部署到多個TaskManager上作業(yè)提交過程作業(yè)提交流程ClientFlink主目錄下的bin目錄中的命令行工具將用戶作業(yè)轉(zhuǎn)換為JobGraphDispatcher接收多個作業(yè),為每個作業(yè)分配一個JobManagerJobManager單個作業(yè)的協(xié)調(diào)者,每個作業(yè)有一個JobManager將JobGraph轉(zhuǎn)化為物理執(zhí)行圖ExecutionGraph向ResourceManager申請資源管理TaskManager,將具體計算任務分發(fā)部署到多個TaskManager上Flink核心組件介紹ResourceManager統(tǒng)一處理資源分配上的問題獲取計算資源、分配給具體計算作業(yè)TaskManager負責具體計算任務的執(zhí)行提供一定量的任務槽位(TaskSlot,簡稱Slot),F(xiàn)link作業(yè)運行在這些Slot上Slot會注冊到ResourceManager上,ResourceManager分配這些Slot給具體的作業(yè)部署層Local、Cluster、Cloud運行時層分布式運行時API層流處理-

DataStream

API批處理–

DataSet

API上層工具基于DataStream/DataSet

API的上層工具Flink組件棧數(shù)據(jù)流圖Flink架構(gòu)和核心組件任務執(zhí)行與資源劃分StreamGraph根據(jù)用戶代碼生成的圖JobGraphStreamGraph優(yōu)化之后生成JobGraph算子鏈ExecutionGraphJobGraph的分布式并行版本物理執(zhí)行圖部署到TaskManager上的具體計算任務再談邏輯視圖到物理執(zhí)行圖算子鏈將相近的算子子任務鏈接在一起鏈接后形成任務(Task)Task以線程的形式被TaskManager調(diào)度可以降低算子子任務之間的傳輸開銷任務、算子子任務與算子鏈上圖中,Source和FlatMap鏈接到了一起,其他算子發(fā)生了跨分區(qū)數(shù)據(jù)交換,無法鏈接到一起。Task

SlotTaskManager下有多個Task

Slot每個Task

Slot中運行著某些TaskSlot之間的內(nèi)存相互隔離Slot內(nèi)部共享TCP連接、心跳等允許用戶設置TaskManager中的Slot的數(shù)目建議將TaskManager下Slot數(shù)設置為CPU核心數(shù)任務槽位與計算資源Slot與TaskManager多個Task共享一個Slot數(shù)據(jù)交換成本更低右圖中,Source和FlatMap計算量不大,WindowAggregation計算量較大,資源互補增加并行度后,在同樣的計算資源基礎上,可以部署更多算子實例,處理的數(shù)據(jù)量更大槽位共享將并行度由2改為6槽位共享后,多個Task共享一個Slot可以增大并行度,有限的資源上處理更多數(shù)據(jù)并行度邏輯視圖并行切分為多個算子子任務每個算子子任務處理輸入數(shù)據(jù)的一部分輸入數(shù)據(jù)量增大時,可適當增大并行度槽位數(shù)目針對TaskManager設置資源切分粒度并行度與槽位數(shù)目本章中,我們以WordCount案例為主線分析了Flink的設計和運行原理。我們重點介紹了一個作業(yè)從數(shù)據(jù)流圖到物理執(zhí)行圖的轉(zhuǎn)化過程,并介紹了轉(zhuǎn)化過程中所涉及的數(shù)據(jù)結(jié)構(gòu)。Flink是基于主從架構(gòu)的,我們通過一個作業(yè)提交的案例,介紹了Flink的核心組件各自的主要功能,包括Client、Dispatcher、JobManager、ResourceManager和TaskManager。最后,我們介紹了Flink的任務執(zhí)行和資源劃分原理,重點分析了算子子任務是如何部署到TaskSlot上。Flink提供了算子鏈和槽位共享等方式,允許開發(fā)者優(yōu)化資源劃分過程。4.DataStreamAPI的介紹和使用本章將詳細介紹DataStreamAPI中各函數(shù)的使用方法。Flink處理程序應該包含三部分:數(shù)據(jù)源(Source)、轉(zhuǎn)換操作(Transformation)、結(jié)果接收(Sink)。我們從這三部分來介紹DataStreamAPI的相關(guān)內(nèi)容。主要包括DataStreamAPI介紹和示例使用、應用技巧、基本知識點總結(jié)和需要注意事項。通過本節(jié)學習您將可以:熟悉Flink程序的骨架結(jié)構(gòu)。熟悉各函數(shù)的功能和使用方法。了解Flink的數(shù)據(jù)類型和序列化。學會用戶自定義函數(shù)。Flink程序的骨架結(jié)構(gòu)常見Transformation的使用方法數(shù)據(jù)類型和序列化用戶自定義函數(shù)

Flink程序的骨架結(jié)構(gòu)初始化運行環(huán)境讀取一到多個Source數(shù)據(jù)源根據(jù)業(yè)務邏輯對數(shù)據(jù)流進行Transformation轉(zhuǎn)換將結(jié)果輸出到Sink調(diào)用作業(yè)執(zhí)行函數(shù)執(zhí)行環(huán)境是作業(yè)與集群交互的入口設置并行度關(guān)閉算子鏈時間、Checkpoint…流處理和批處理的執(zhí)行環(huán)境不一樣Java、Scala兩套API設置執(zhí)行環(huán)境//創(chuàng)建Flink執(zhí)行環(huán)境

StreamExecutionEnvironmentenv=StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(2);env.disableOperatorChaining();

Source、Transformation和SinkSource讀取數(shù)據(jù)源統(tǒng)稱為Source文件系統(tǒng)、消息隊列、數(shù)據(jù)庫等Transformation使用Flink提供的各類函數(shù),進行有狀態(tài)的計算數(shù)據(jù)流的分組、窗口和聚合操作等Sink將計算結(jié)果輸出到外部系統(tǒng),統(tǒng)稱為Sink目的地可以是文件系統(tǒng)、消息隊列、數(shù)據(jù)庫等Flink是延遲執(zhí)行(LazyEvaluation)的調(diào)用execute()方法,F(xiàn)link才會真正執(zhí)行否則無法得到計算結(jié)果字符串參數(shù)為當前作業(yè)名執(zhí)行//execute

env.execute("kafkastreamingwordcount");Flink程序的骨架結(jié)構(gòu)常見Transformation的使用方法數(shù)據(jù)類型和序列化用戶自定義函數(shù)數(shù)據(jù)傳輸、持久化序列化:將內(nèi)存對象轉(zhuǎn)換成二進制串、網(wǎng)絡可傳輸或可持久化反序列化:將二進制串轉(zhuǎn)換為內(nèi)存對象,可直接在編程語言中讀寫和操作常見序列化方式:JSONJava、Kryo、Avro、Thrift、ProtobufFlink開發(fā)了自己的序列化框架更早地完成類型檢查節(jié)省數(shù)據(jù)存儲空間序列化和反序列化基礎類型Java、Scala基礎數(shù)據(jù)類型數(shù)組復合類型Scala

case

classJava

POJOTuple輔助類型Option、List、Map泛型和其他類型GenericFlink支持的數(shù)據(jù)類型TypeInformaton用來表示數(shù)據(jù)類型,創(chuàng)建序列化器每種數(shù)據(jù)類型都對應一個TypeInfomationTupleTypeInfo、PojoTypeInfo

…TypeInformationFlink會自動推斷類型,調(diào)用對應的序列化器,對數(shù)據(jù)進行序列化和反序列化類型推斷和序列化packagemon.typeinfo;public

class

Types{//java.lang.Void

public

static

finalTypeInformation<Void>VOID=BasicTypeInfo.VOID_TYPE_INFO;//java.lang.String

public

static

finalTypeInformation<String>STRING=BasicTypeInfo.STRING_TYPE_INFO;//java.lang.Boolean

public

static

finalTypeInformation<Boolean>BOOLEAN=BasicTypeInfo.BOOLEAN_TYPE_INFO;//java.lang.Integer

public

static

finalTypeInformation<Integer>INT=BasicTypeInfo.INT_TYPE_INFO;//java.lang.Long

public

static

finalTypeInformation<Long>LONG=BasicTypeInfo.LONG_TYPE_INFO;...}一些基礎類型的TypeInformation:Types.STRING是用來表示java.lang.String的TypeInformationTypes.STRING被定義為BasicTypeInfo.STRING_TYPE_INFOSTRING_TYPE_INFO:使用何種序列化器和比較器類型推斷和序列化public

static

finalBasicTypeInfo<String> STRING_TYPE_INFO= newBasicTypeInfo<>( String.class, newClass<?>[]{}, StringSerializer.INSTANCE, StringComparator.class);STRING_TYPE_INFO定義使用何種序列化器和比較器:在聲明式文件中定義Schema使用工具將Schema轉(zhuǎn)換為Java可用的類Avro

Specific生成的類與POJO類似有g(shù)etter、setter方法在Flink中可以像使用POJO一樣使用Avro

Specific模式Avro

Generic不生成具體的類用GenericRecord封裝所有用戶定義的數(shù)據(jù)結(jié)構(gòu)必須給Flink提供Schema信息Avro{"namespace":"org.apache.flink.tutorials.avro","type":"record","name":"MyPojo","fields":[ {"name":"id","type":"int"}, {"name":"name","type":"string"}]}Avro聲明式文件:Kryo是大數(shù)據(jù)領(lǐng)域經(jīng)常使用的序列化框架Flink無法推斷出數(shù)據(jù)類型時,將該數(shù)據(jù)類型定義為GenericTypeInfo,使用Kryo作為后備選項進行序列化最好實現(xiàn)自己的序列化器,并對數(shù)據(jù)類型和序列化器進行注冊Kryo在有些場景效率不高env.getConfig.disableGenericTypes()禁用Kryo,可以定位到具體哪個類型無法被Flink自動推斷,然后針對該類型創(chuàng)建更高效的序列化器Kryo注冊數(shù)據(jù)類型和序列化器://將MyCustomType類進行注冊

env.getConfig().registerKryoType(MyCustomType.class);//或者使用下面的方式并且實現(xiàn)自定義序列化器

env.getConfig().registerTypeWithKryoSerializer(MyCustomType.class,MyCustomSerializer.class);static

class

MyClassSerializer

extends

Serializer<MyCustomType>implements

Serializable

{private

static

final

longserialVersionUID=...@Overridepublic

void

write(Kryokryo,Outputoutput,MyCustomTypemyCustomType)

{...}@OverridepublicMyCustomTyperead(Kryokryo,Inputinput,Class<MyCustomType>type)

{...}}與Avro

Specific模式相似,使用聲明式語言定義Schema,使用工具將聲明式語言轉(zhuǎn)化為Java類有人已經(jīng)實現(xiàn)好Kryo的序列化器案例:MyCustomType是使用Thrift工具生成的Java類,TBaseSerializer是com.twitter:chill-thrift包中別人實現(xiàn)好的序列化器,該序列化器基于Kryo的Serializer。注意在pom.xml中添加相應的依賴Thrift、Protobuf//GoogleProtobuf

//MyCustomType類是使用Protobuf生成的Java類

//ProtobufSerializer是別人實現(xiàn)好的序列化器

env.getConfig().registerTypeWithKryoSerializer(MyCustomType.class,ProtobufSerializer.class);//ApacheThrift

//MyCustomType是使用Thrift生成的Java類

//TBaseSerializer是別人實現(xiàn)好的序列化器

env.getConfig().addDefaultKryoSerializer(MyCustomType.class,TBaseSerializer.class);Flink的數(shù)據(jù)類型:Java、Scala、Table

API分別有自己的數(shù)據(jù)類型體系絕大多數(shù)情況下,程序員不需要關(guān)心使用何種TypeInformation,只需要使用自己所需的數(shù)據(jù)類型Flink會做類型推斷、選擇對應的序列化器當自動類型推斷失效,用戶需要關(guān)注TypeInformation數(shù)據(jù)類型選擇:需要考慮:上下游的數(shù)據(jù)結(jié)構(gòu)、序列化器的性能、狀態(tài)數(shù)據(jù)的持續(xù)迭代能力POJO和Tuple等內(nèi)置類型性能更好Avro、Thrift和Protobuf對上下游數(shù)據(jù)的兼容性更好,不需要在Flink應用中重新設計一套POJOPOJO和Avro對Flink狀態(tài)數(shù)據(jù)的持續(xù)迭代更友好數(shù)據(jù)類型小結(jié)Flink程序的骨架結(jié)構(gòu)常見Transformation的使用方法數(shù)據(jù)類型和序列化用戶自定義函數(shù)單數(shù)據(jù)流轉(zhuǎn)換基于Key的分組轉(zhuǎn)換多數(shù)據(jù)流轉(zhuǎn)換數(shù)據(jù)重分布轉(zhuǎn)換DataStream<T>泛型T為數(shù)據(jù)流中每個元素的類型四類Tranformation轉(zhuǎn)換每個輸入元素對應一個輸出元素重寫MapFunction或RichMapFunctionMapFunction<T,O>

T為輸入類型O為輸出類型實現(xiàn)其中的map()虛方法主邏輯中調(diào)用該函數(shù)單數(shù)據(jù)流轉(zhuǎn)換-

map@FunctionalInterfacepublic

interface

MapFunction<T,O>extends

Function,Serializable{//調(diào)用這個API就是繼承并實現(xiàn)這個虛函數(shù)

Omap(Tvalue)

throwsException;}//第一個泛型是輸入類型,第二個泛型是輸出類型

public

static

class

DoubleMapFunction

implements

MapFunction<Integer,String>{@OverridepublicStringmap(Integerinput)

{ return

"functioninput:"+input+",output:"+(input*2);}}DataStream<String>functionDataStream=dataStream.map(newDoubleMapFunction());MapFunction源代碼一個MapFunction的實現(xiàn)直接繼承接口類并實現(xiàn)map虛方法上頁所示使用匿名類使用Lambda表達式單數(shù)據(jù)流轉(zhuǎn)換-

map//匿名類

DataStream<String>anonymousDataStream=dataStream.map(newMapFunction<Integer,String>(){@OverridepublicStringmap(Integerinput)

throwsException{ return

"anonymousfunctioninput:"+input+",output:"+(input*2);}});//使用Lambda表達式

DataStream<String>lambdaStream=dataStream .map(input->"lambdainput:"+input+",output:"+(input*2));匿名類實現(xiàn)MapFunctionLambda表達式實現(xiàn)MapFunction對輸入元素進行過濾繼承并實現(xiàn)FilterFunction或RichFilterFunction重寫filter虛方法True

–保留False

–過濾單數(shù)據(jù)流轉(zhuǎn)換-

filterDataStream<Integer>dataStream=senv.fromElements(1,2,-3,0,5,-9,8);//使用->構(gòu)造Lambda表達式

DataStream<Integer>lambda=dataStream.filter(input->input>0);public

static

class

MyFilterFunction

extends

RichFilterFunction<Integer>{//limit參數(shù)可以從外部傳入

privateIntegerlimit;public

MyFilterFunction(Integerlimit)

{this.limit=limit;}@Overridepublic

boolean

filter(Integerinput)

{ returninput>this.limit;}}Lambda表達式實現(xiàn)FilterFunction實現(xiàn)FilterFunction與map()相似輸出零個、一個或多個元素可對列表結(jié)果展平單數(shù)據(jù)流轉(zhuǎn)換-

flatMap{蘋果,梨,香蕉}.map(去皮){去皮蘋果,去皮梨,去皮香蕉}mapflatMap{蘋果,梨,香蕉}.flatMap(切碎){[蘋果碎片1,蘋果碎片2],[梨碎片1,梨碎片2,梨碎片3],[香蕉碎片1]}{蘋果碎片1,蘋果碎片2,梨碎片1,梨碎片2,梨碎片3,香蕉碎片1}使用Lambda表達式Collector用來收集元素flatMap()虛方法中不使用return返回數(shù)據(jù),使用Collector收集返回數(shù)據(jù)Collector<String>中的泛型String為返回數(shù)據(jù)類型將flatMap()看做map()和filter()更一般的形式map()和filter()的語義更明確單數(shù)據(jù)流轉(zhuǎn)換-

flatMapDataStream<String>dataStream=senv.fromElements("HelloWorld","HellothisisFlink");

//split函數(shù)的輸入為"HelloWorld"輸出為"Hello"和"World"組成的列表["Hello","World"]

//flatMap將列表中每個元素提取出來

//最后輸出為["Hello","World","Hello","this","is","Flink"]

DataStream<String>words=dataStream.flatMap((Stringinput,Collector<String>collector)->{

for(Stringword:input.split("")){

collector.collect(word);

}}).returns(Types.STRING);數(shù)據(jù)分組后可進行聚合操作keyBy()將一個DataStream轉(zhuǎn)化為一個KeyedStream聚合操作將KeyedStream轉(zhuǎn)化為DataStreamKeyedStream繼承自DataStream基于Key的分組轉(zhuǎn)換根據(jù)某種屬性或數(shù)據(jù)的某些字段對數(shù)據(jù)進行分組對一個分組內(nèi)的數(shù)據(jù)進行處理股票:相同股票代號的數(shù)據(jù)分組到一起相同Key的數(shù)據(jù)被分配到同一算子實例上需要指定Key數(shù)字位置字段名KeySelector基于Key的分組轉(zhuǎn)換-

keyByDataStream<Tuple2<Integer,Double>>dataStream=senv.fromElements( Tuple2.of(1,1.0),Tuple2.of(2,3.2), Tuple2.of(1,5.5),Tuple2.of(3,10.0),Tuple2.of(3,12.5));//使用數(shù)字位置定義Key按照第一個字段進行分組

DataStream<Tuple2<Integer,Double>>keyedStream=dataStream.keyBy(0).sum(1);KeySelector重寫getKey()方法單數(shù)據(jù)流轉(zhuǎn)換-

keyBy//IN為數(shù)據(jù)流元素,KEY為所選擇的Key

@FunctionalInterfacepublic

interface

KeySelector<IN,KEY>extends

Function,Serializable

{//選擇一個字段作為Key

KEYgetKey(INvalue)

throwsException;}public

class

Word{publicStringword;public

intcount;}//使用KeySelector

DataStream<Word>keySelectorStream=wordStream.keyBy(newKeySelector<Word,String>(){@OverridepublicStringgetKey(Wordin)

{returnin.word;}}).sum("count");KeySelector源碼一個KeySelector的實現(xiàn)sum()、max()、min()等指定字段,對該字段進行聚合KeySelector流數(shù)據(jù)上的聚合實時不斷輸出到下游狀態(tài)存儲中間數(shù)據(jù)單數(shù)據(jù)流轉(zhuǎn)換–

Aggregations將某個字段加和結(jié)果保存到該字段上不關(guān)心其他字段的計算結(jié)果單數(shù)據(jù)流轉(zhuǎn)換–

sumDataStream<Tuple3<Integer,Integer,Integer>>tupleStream=

senv.fromElements( Tuple3.of(0,0,0),Tuple3.of(0,1,1), Tuple3.of(0,2,2),Tuple3.of(1,0,6), Tuple3.of(1,1,7),Tuple3.of(1,0,8));DataStream<Tuple3<Integer,Integer,Integer>>sumStream=tupleStream.keyBy(0).sum(1);//按第一個字段分組,對第二個字段求和,打印出來的結(jié)果如下:

//(0,0,0)

//(0,1,0)

//(0,3,0)

//(1,0,6)

//(1,1,6)

//(1,1,6)

max()對該字段求最大值結(jié)果保存到該字段上不保證其他字段的計算結(jié)果maxBy()對該字段求最大值其他字段保留最大值元素的值單數(shù)據(jù)流轉(zhuǎn)換–

max

/

maxByDataStream<Tuple3<Integer,Integer,Integer>>tupleStream=

senv.fromElements( Tuple3.of(0,0,0),Tuple3.of(0,1,1), Tuple3.of(0,2,2),Tuple3.of(1,0,6), Tuple3.of(1,1,7),Tuple3.of(1,0,8));//按第一個字段分組,對第三個字段求最大值max,打印出來的結(jié)果如下:

DataStream<Tuple3<Integer,Integer,Integer>>maxStream=tupleStream.keyBy(0).max(2);//(0,0,0)

//(0,0,1)

//(0,0,2)

//(1,0,6)

//(1,0,7)

//(1,0,8)

//按第一個字段分組,對第三個字段求最大值maxBy,打印出來的結(jié)果如下:

DataStream<Tuple3<Integer,Integer,Integer>>maxByStream=tupleStream.keyBy(0).maxBy(2);//(0,0,0)

//(0,1,1)

//(0,2,2)

//(1,0,6)

//(1,1,7)

//(1,0,8)

比Aggregation更通用在KeyedStream上生效接受兩個輸入,生成一個輸出兩兩合一地匯總操作基于Key的分組轉(zhuǎn)換-

reduce實現(xiàn)ReduceFunction基于Key的分組轉(zhuǎn)換-

reducepublic

static

class

MyReduceFunction

implements

ReduceFunction<Score>{@OverridepublicScorereduce(Scores1,Scores2)

{ returnScore.of(,"Sum",s1.score+s2.score);}}DataStream<Score>dataStream=senv.fromElements( Score.of("Li","English",90),Score.of("Wang","English",88), Score.of("Li","Math",85),Score.of("Wang","Math",92), Score.of("Liu","Math",91),Score.of("Liu","English",87));//實現(xiàn)ReduceFunction

DataStream<Score>sumReduceFunctionStream=dataStream.keyBy("name").reduce(newMyReduceFunction());//使用Lambda表達式

DataStream<Score>sumLambdaStream=dataStream .keyBy("name")

.reduce((s1,s2)->Score.of(,"Sum",s1.score+s2.score));將多個同類型的DataStream<T>合并為一個DataStream<T>數(shù)據(jù)按照先進先出(FIFO)合并多數(shù)據(jù)流轉(zhuǎn)換-

unionDataStream<StockPrice>shenzhenStockStream=...DataStream<StockPrice>hongkongStockStream=...DataStream<StockPrice>shanghaiStockStream=...DataStream<StockPrice>unionStockStream=shenzhenStockStream.union(hongkongStockStream,shanghaiStockStream);只能連接兩個DataStream數(shù)據(jù)流兩個數(shù)據(jù)流類型可以不一致兩個DataStream經(jīng)過connect()之后轉(zhuǎn)化為ConnectedStreams,ConnectedStreams會對兩個流的數(shù)據(jù)應用不同的處理方法,且雙流之間可以共享狀態(tài)應用場景為:使用一個控制流對另一個數(shù)據(jù)流進行控制多數(shù)據(jù)流轉(zhuǎn)換-

connect重寫CoMapFunction或CoFlatMapFunction三個泛型,分別對應第一個輸入流的數(shù)據(jù)類型、第二個輸入流的數(shù)據(jù)類型和輸出流的數(shù)據(jù)類型對于CoFlatMapFunction,flatMap1()方法處理第一個流的數(shù)據(jù),flatMap2()方法處理第二個流的數(shù)據(jù)可以做到類似SQL

Join的效果多數(shù)據(jù)流轉(zhuǎn)換-

connect//IN1為第一個輸入流的數(shù)據(jù)類型

//IN2為第二個輸入流的數(shù)據(jù)類型

//OUT為輸出類型

public

interface

CoFlatMapFunction<IN1,IN2,OUT>extends

Function,Serializable

{//處理第一個流的數(shù)據(jù)

void

flatMap1(IN1value,Collector<OUT>out)

throwsException;//處理第二個流的數(shù)據(jù)

void

flatMap2(IN2value,Collector<OUT>out)

throwsException;}//CoMapFunction三個泛型分別對應第一個流的輸入、第二個流的輸入,map之后的輸出

public

static

class

MyCoMapFunction

implements

CoMapFunction<Integer,String,String>{@OverridepublicStringmap1(Integerinput1)

{ returninput1.toString();}@OverridepublicStringmap2(Stringinput2)

{ returninput2;}}CoFlatMapFunction源代碼一個CoMapFunction實現(xiàn)并行度邏輯視圖中的算子被切分為多個算子子任務每個算子子任務處理一部分數(shù)據(jù)可以在整個作業(yè)的執(zhí)行環(huán)境層面設置也可以對某個算子單獨設置并行度StreamExecutionEnvironmentsenv=StreamExecutionEnvironment.getExecutionEnvironment();//獲取當前執(zhí)行環(huán)境的默認并行度

intdefaultParalleism=senv.getParallelism();//設置所有算子的并行度為4,表示所有算子的并行執(zhí)行的實例數(shù)為4

senv.setParallelism(4);在執(zhí)行環(huán)境中設置并行度:對某個算子單獨設置:dataStream.map(newMyMapper()).setParallelism(defaultParallelism*2);默認情況下,數(shù)據(jù)自動分布到多個實例(或者稱之為分區(qū))上手動在多個實例上進行數(shù)據(jù)分配避免數(shù)據(jù)傾斜輸入是DataStream,輸出也是DataStream數(shù)據(jù)重分布dataStream.shuffle();基于正態(tài)分布,將數(shù)據(jù)隨機分配到下游各算子實例上:dataStream.broadcast();數(shù)據(jù)會被復制并廣播發(fā)送給下游的所有實例上:dataStream.global();將所有數(shù)據(jù)發(fā)送給下游算子的第一個實例上:

rebalance()使用Round-Ribon思想將數(shù)據(jù)均勻分配到各實例上rescale()就近發(fā)送給下游每個實例數(shù)據(jù)重分布rebalance()將數(shù)據(jù)輪詢式地分布到下游子任務上

當上游有2個子任務、下游有4個子任務時使用rescale()partitionCustom()自定義數(shù)據(jù)重分布邏輯Partitioner[K]中泛型K為根據(jù)哪個字段進行分區(qū)對一個Score類型數(shù)據(jù)流重分布,希望按照id均勻分配到下游各實例,那么泛型K就為id的數(shù)據(jù)類型Long重寫partition()方法數(shù)據(jù)重分布@FunctionalInterfacepublic

interface

Partitioner<K>extends

java.io.Serializable,Function

{//根據(jù)key決定該數(shù)據(jù)分配到下游第幾個分區(qū)(實例)

int

partition(Kkey,intnumPartitions);}/**

*Partitioner<T>其中泛型T為指定的字段類型*重寫partiton函數(shù),并根據(jù)T字段對數(shù)據(jù)流中的所有元素進行數(shù)據(jù)重分配**/

public

static

class

MyPartitioner

implements

Partitioner<String>{privateRandomrand=newRandom();privatePatternpattern=Ppile(".*\\d+.*");/**

*key泛型T即根據(jù)哪個字段進行數(shù)據(jù)重分配,本例中是Tuple2(Int,String)中的String

*numPartitons為當前有多少個并行實例*函數(shù)返回值是一個Int為該元素將被發(fā)送給下游第幾個實例**/

@Overridepublic

int

partition(Stringkey,intnumPartitions)

{intrandomNum=rand.nextInt(numPartitions/2);Matcherm=pattern.matcher(key);if(m.matches()){returnrandomNum;}else{returnrandomNum+numPartitions/2;}}}//對(Int,String)中的第二個字段使用MyPartitioner中的重分布邏輯

DataStream<Tuple2<Integer,String>>partitioned= dataStream.partitionCustom(newMyPartitioner(),1);Partitioner源碼

一個Partitioner的實現(xiàn)Flink程序的骨架結(jié)構(gòu)常見Transformation的使用方法數(shù)據(jù)類型和序列化用戶自定義函數(shù)用戶自定義函數(shù)的三種方式:繼承并實現(xiàn)函數(shù)類使用Lambda表達式繼承并實現(xiàn)Rich函數(shù)類用戶自定義函數(shù)對于map()、flatMap()、reduce()等函數(shù),我們可以實現(xiàn)MapFunction、FlatMapFunction、ReduceFunction等interface接口。以FlatM

溫馨提示

  • 1. 本站所有資源如無特殊說明,都需要本地電腦安裝OFFICE2007和PDF閱讀器。圖紙軟件為CAD,CAXA,PROE,UG,SolidWorks等.壓縮文件請下載最新的WinRAR軟件解壓。
  • 2. 本站的文檔不包含任何第三方提供的附件圖紙等,如果需要附件,請聯(lián)系上傳者。文件的所有權(quán)益歸上傳用戶所有。
  • 3. 本站RAR壓縮包中若帶圖紙,網(wǎng)頁內(nèi)容里面會有圖紙預覽,若沒有圖紙預覽就沒有圖紙。
  • 4. 未經(jīng)權(quán)益所有人同意不得將文件中的內(nèi)容挪作商業(yè)或盈利用途。
  • 5. 人人文庫網(wǎng)僅提供信息存儲空間,僅對用戶上傳內(nèi)容的表現(xiàn)方式做保護處理,對用戶上傳分享的文檔內(nèi)容本身不做任何修改或編輯,并不能對任何下載內(nèi)容負責。
  • 6. 下載文件中如有侵權(quán)或不適當內(nèi)容,請與我們聯(lián)系,我們立即糾正。
  • 7. 本站不保證下載資源的準確性、安全性和完整性, 同時也不承擔用戶因使用這些下載資源對自己和他人造成任何形式的傷害或損失。

評論

0/150

提交評論