반응형
NiFi - ExecuteScript (NiFi에서 Python사용하기)
NiFi에서 내장된 프로세서로는 데이터흐름 및 처리가 어려울 때 직접 코딩하여 FlowFile을 제어, 변형할 수 있는 프로세서이다.
만약에 더 높은 수준으로 FlowFile을 처리해야할 경우 직접 프로세서를 만들어야 하는게 더 좋지만 내장 프로세서가 없고 복잡한 처리, 성공과 실패 관계로만 라우팅 할 경우에는 ExecuteScript를 사용해 보는 것도 나쁘지 않다.
무작정 ExecuteScript를 사용하는 건 옳지 못하다.
원하는 프로세서가 정말 없는지 깊게 검색해 보고 없으면 사용할 것.
공식 문서
해당 링크에 코딩 가이드가 나와있다.
여기 블로그에서는 Python 관련 내용 중 내가 자주사용한 내용만 다뤄보도록 할 것이다.
지원 언어
다양한 언어를 지원하고 있다.
- Groovy
- Jython
- Javascript
- JRuby
- 등
여기서 Jython은 Java로 만든 Python입니다.
평소 아는 Python 은 CPython으로 C로 만든 Python입니다.
그래서 Jython은 CPython의 일부 라이브러리(C로 만든 라이브러리)가 호환되지 않습니다. 예 - Pandas등..
CPython과 Jython에서 같이 사용 가능한 라이브러리는 아래와 같습니다.(제가 직접 써본 경험으로만 작성(더 있을수도 있다.))
- json
- urlparse (Jython에서는 `from urlparse import urlparse`, CPython에서는 `from urllib.parse import urlparse`)
- 등
잘 활용하면 매우 좋은 프로세서입니다.
하지만, 인터프리터라는 단점이 있습니다.
스크립트 #1 - FlowFile의 Content를 읽고 쓰기
from org.apache.commons.io import IOUtils
from java.nio.charset import StandardCharsets
from org.apache.nifi.processor.io import StreamCallback
class processer(StreamCallback):
def __init__(self):
pass
def process(self,inputStream,outputStream):
original_Data = IOUtils.toString(inputStream, StandardCharsets.UTF_8)
# original_Data 는 이제 FlowFile의 Content내용을 가집니다.
test = "Hello"
outputStream.write(bytearray(test.encode('utf-8')))
# FlowFile의 Content내용은 이제 Hello 입니다.
flowFile = session.get()
if (flowFile != None):
flowFile = session.write(flowFile, processer())
session.transfer(flowFile, REL_SUCCESS)
만약 FlowFile의 Content가 JSON인 경우에는
최상단에 `import json`을 해주고,
`json_data = json.loads(original_Data)` 와 같은 구문을 넣어주면 됩니다.
물론 다시 Content를 내보낼땐 `json.dumps`또한 해주면 됩니다.
스크립트 #2 - FlowFile의 Attribute를 Content로 쓰기
from org.apache.commons.io import IOUtils
from java.nio.charset import StandardCharsets
from org.apache.nifi.processor.io import StreamCallback
class processer(StreamCallback):
att = None
def __init__(self, att):
self.att = att
def process(self,outputStream):
outputStream.write(bytearray(self.att.encode('utf-8')))
# FlowFile의 Content내용은 이제 FlowFile에서 Attribute이름이 test_att의 값 입니다.
flowFile = session.get()
if (flowFile != None):
att123 = flowFile.getAttribute('test_att')
# 변수 att123은 flowFile에 있는 Attributes중 이름이 test_att 인 값을 가져옵니다.
flowFile = session.write(flowFile, processer(att123))
session.transfer(flowFile, REL_SUCCESS)
스크립트 #3 - 기타 Script들
FlowFile을 실패로 전송해야 할 경우
session.transfer(flowFile, REL_FAILURE)
모든 Attribute를 가져오고 싶은 경우
for key,value in flowFile.getAttributes().iteritems():
# 처리문 작성
여러 Attribute를 추가하고 싶은 경우
flowFile = session.putAllAttributes(flowFile, attrMap)
# attrMap은 파이썬의 딕셔너리 형태이어야 합니다.
모든 내용은 공식문서에서 가져왔습니다.
반응형