NiFi - ExecuteScript (NiFi에서 Python사용하기)
NiFi에서 내장된 프로세서로는 데이터흐름 및 처리가 어려울 때 직접 코딩하여 FlowFile을 제어, 변형할 수 있는 프로세서이다.
만약에 더 높은 수준으로 FlowFile을 처리해야할 경우 직접 프로세서를 만들어야 하는게 더 좋지만 내장 프로세서가 없고 복잡한 처리, 성공과 실패 관계로만 라우팅 할 경우에는 ExecuteScript를 사용해 보는 것도 나쁘지 않다.
무작정 ExecuteScript를 사용하는 건 옳지 못하다.
원하는 프로세서가 정말 없는지 깊게 검색해 보고 없으면 사용할 것.
공식 문서
ExecuteScript
Dynamic Properties Any dynamic (user-defined) properties defined in ExecuteScript are passed to the script engine as variables set to the PropertyValue object corresponding to the dynamic property. This allows you to get the String value of the property, b
nifi.apache.org
해당 링크에 코딩 가이드가 나와있다.
여기 블로그에서는 Python 관련 내용 중 내가 자주사용한 내용만 다뤄보도록 할 것이다.
지원 언어
다양한 언어를 지원하고 있다.
- Groovy
- Jython
- Javascript
- JRuby
- 등
여기서 Jython은 Java로 만든 Python입니다.
평소 아는 Python 은 CPython으로 C로 만든 Python입니다.
그래서 Jython은 CPython의 일부 라이브러리(C로 만든 라이브러리)가 호환되지 않습니다. 예 - Pandas등..
CPython과 Jython에서 같이 사용 가능한 라이브러리는 아래와 같습니다.(제가 직접 써본 경험으로만 작성(더 있을수도 있다.))
- json
- urlparse (Jython에서는 `from urlparse import urlparse`, CPython에서는 `from urllib.parse import urlparse`)
- 등
잘 활용하면 매우 좋은 프로세서입니다.
하지만, 인터프리터라는 단점이 있습니다.
스크립트 #1 - FlowFile의 Content를 읽고 쓰기
from org.apache.commons.io import IOUtils
from java.nio.charset import StandardCharsets
from org.apache.nifi.processor.io import StreamCallback
class processer(StreamCallback):
def __init__(self):
pass
def process(self,inputStream,outputStream):
original_Data = IOUtils.toString(inputStream, StandardCharsets.UTF_8)
# original_Data 는 이제 FlowFile의 Content내용을 가집니다.
test = "Hello"
outputStream.write(bytearray(test.encode('utf-8')))
# FlowFile의 Content내용은 이제 Hello 입니다.
flowFile = session.get()
if (flowFile != None):
flowFile = session.write(flowFile, processer())
session.transfer(flowFile, REL_SUCCESS)
만약 FlowFile의 Content가 JSON인 경우에는
최상단에 `import json`을 해주고,
`json_data = json.loads(original_Data)` 와 같은 구문을 넣어주면 됩니다.
물론 다시 Content를 내보낼땐 `json.dumps`또한 해주면 됩니다.
스크립트 #2 - FlowFile의 Attribute를 Content로 쓰기
from org.apache.commons.io import IOUtils
from java.nio.charset import StandardCharsets
from org.apache.nifi.processor.io import StreamCallback
class processer(StreamCallback):
att = None
def __init__(self, att):
self.att = att
def process(self,outputStream):
outputStream.write(bytearray(self.att.encode('utf-8')))
# FlowFile의 Content내용은 이제 FlowFile에서 Attribute이름이 test_att의 값 입니다.
flowFile = session.get()
if (flowFile != None):
att123 = flowFile.getAttribute('test_att')
# 변수 att123은 flowFile에 있는 Attributes중 이름이 test_att 인 값을 가져옵니다.
flowFile = session.write(flowFile, processer(att123))
session.transfer(flowFile, REL_SUCCESS)
스크립트 #3 - 기타 Script들
FlowFile을 실패로 전송해야 할 경우
session.transfer(flowFile, REL_FAILURE)
모든 Attribute를 가져오고 싶은 경우
for key,value in flowFile.getAttributes().iteritems():
# 처리문 작성
여러 Attribute를 추가하고 싶은 경우
flowFile = session.putAllAttributes(flowFile, attrMap)
# attrMap은 파이썬의 딕셔너리 형태이어야 합니다.
모든 내용은 공식문서에서 가져왔습니다.
NiFi - ExecuteScript (NiFi에서 Python사용하기)
NiFi에서 내장된 프로세서로는 데이터흐름 및 처리가 어려울 때 직접 코딩하여 FlowFile을 제어, 변형할 수 있는 프로세서이다.
만약에 더 높은 수준으로 FlowFile을 처리해야할 경우 직접 프로세서를 만들어야 하는게 더 좋지만 내장 프로세서가 없고 복잡한 처리, 성공과 실패 관계로만 라우팅 할 경우에는 ExecuteScript를 사용해 보는 것도 나쁘지 않다.
무작정 ExecuteScript를 사용하는 건 옳지 못하다.
원하는 프로세서가 정말 없는지 깊게 검색해 보고 없으면 사용할 것.
공식 문서
ExecuteScript
Dynamic Properties Any dynamic (user-defined) properties defined in ExecuteScript are passed to the script engine as variables set to the PropertyValue object corresponding to the dynamic property. This allows you to get the String value of the property, b
nifi.apache.org
해당 링크에 코딩 가이드가 나와있다.
여기 블로그에서는 Python 관련 내용 중 내가 자주사용한 내용만 다뤄보도록 할 것이다.
지원 언어
다양한 언어를 지원하고 있다.
- Groovy
- Jython
- Javascript
- JRuby
- 등
여기서 Jython은 Java로 만든 Python입니다.
평소 아는 Python 은 CPython으로 C로 만든 Python입니다.
그래서 Jython은 CPython의 일부 라이브러리(C로 만든 라이브러리)가 호환되지 않습니다. 예 - Pandas등..
CPython과 Jython에서 같이 사용 가능한 라이브러리는 아래와 같습니다.(제가 직접 써본 경험으로만 작성(더 있을수도 있다.))
- json
- urlparse (Jython에서는 `from urlparse import urlparse`, CPython에서는 `from urllib.parse import urlparse`)
- 등
잘 활용하면 매우 좋은 프로세서입니다.
하지만, 인터프리터라는 단점이 있습니다.
스크립트 #1 - FlowFile의 Content를 읽고 쓰기
from org.apache.commons.io import IOUtils
from java.nio.charset import StandardCharsets
from org.apache.nifi.processor.io import StreamCallback
class processer(StreamCallback):
def __init__(self):
pass
def process(self,inputStream,outputStream):
original_Data = IOUtils.toString(inputStream, StandardCharsets.UTF_8)
# original_Data 는 이제 FlowFile의 Content내용을 가집니다.
test = "Hello"
outputStream.write(bytearray(test.encode('utf-8')))
# FlowFile의 Content내용은 이제 Hello 입니다.
flowFile = session.get()
if (flowFile != None):
flowFile = session.write(flowFile, processer())
session.transfer(flowFile, REL_SUCCESS)
만약 FlowFile의 Content가 JSON인 경우에는
최상단에 `import json`을 해주고,
`json_data = json.loads(original_Data)` 와 같은 구문을 넣어주면 됩니다.
물론 다시 Content를 내보낼땐 `json.dumps`또한 해주면 됩니다.
스크립트 #2 - FlowFile의 Attribute를 Content로 쓰기
from org.apache.commons.io import IOUtils
from java.nio.charset import StandardCharsets
from org.apache.nifi.processor.io import StreamCallback
class processer(StreamCallback):
att = None
def __init__(self, att):
self.att = att
def process(self,outputStream):
outputStream.write(bytearray(self.att.encode('utf-8')))
# FlowFile의 Content내용은 이제 FlowFile에서 Attribute이름이 test_att의 값 입니다.
flowFile = session.get()
if (flowFile != None):
att123 = flowFile.getAttribute('test_att')
# 변수 att123은 flowFile에 있는 Attributes중 이름이 test_att 인 값을 가져옵니다.
flowFile = session.write(flowFile, processer(att123))
session.transfer(flowFile, REL_SUCCESS)
스크립트 #3 - 기타 Script들
FlowFile을 실패로 전송해야 할 경우
session.transfer(flowFile, REL_FAILURE)
모든 Attribute를 가져오고 싶은 경우
for key,value in flowFile.getAttributes().iteritems():
# 처리문 작성
여러 Attribute를 추가하고 싶은 경우
flowFile = session.putAllAttributes(flowFile, attrMap)
# attrMap은 파이썬의 딕셔너리 형태이어야 합니다.
모든 내용은 공식문서에서 가져왔습니다.