+
Skip to content

KubernetesPodOperator should retry log tailing in case of interruption #11325

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 7 commits into from
Oct 9, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
56 changes: 50 additions & 6 deletions airflow/kubernetes/pod_launcher.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,12 @@
# under the License.
"""Launches PODs"""
import json
import math
import time
from datetime import datetime as dt
from typing import Optional, Tuple

import pendulum
import tenacity
from kubernetes import client, watch
from kubernetes.client.models.v1_pod import V1Pod
Expand Down Expand Up @@ -124,9 +126,23 @@ def monitor_pod(self, pod: V1Pod, get_logs: bool) -> Tuple[State, Optional[str]]
:return: Tuple[State, Optional[str]]
"""
if get_logs:
logs = self.read_pod_logs(pod)
for line in logs:
self.log.info(line)
read_logs_since_sec = None
last_log_time = None
while True:
logs = self.read_pod_logs(pod, timestamps=True, since_seconds=read_logs_since_sec)
for line in logs:
timestamp, message = self.parse_log_line(line.decode('utf-8'))
last_log_time = pendulum.parse(timestamp)
self.log.info(message)
time.sleep(1)

if not self.base_container_is_running(pod):
break

self.log.warning('Pod %s log read interrupted', pod.metadata.name)
delta = pendulum.now() - last_log_time
# Prefer logs duplication rather than loss
read_logs_since_sec = math.ceil(delta.total_seconds())
result = None
if self.extract_xcom:
while self.base_container_is_running(pod):
Expand All @@ -140,6 +156,22 @@ def monitor_pod(self, pod: V1Pod, get_logs: bool) -> Tuple[State, Optional[str]]
time.sleep(2)
return self._task_status(self.read_pod(pod)), result

def parse_log_line(self, line: str) -> Tuple[str, str]:
"""
Parse K8s log line and returns the final state

:param line: k8s log line
:type line: str
:return: timestamp and log message
:rtype: Tuple[str, str]
"""
split_at = line.find(' ')
if split_at == -1:
raise Exception('Log not in "{{timestamp}} {{log}}" format. Got: {}'.format(line))
timestamp = line[:split_at]
message = line[split_at + 1:].rstrip()
return timestamp, message

def _task_status(self, event):
self.log.info(
'Event: %s had an event of type %s',
Expand Down Expand Up @@ -171,16 +203,28 @@ def base_container_is_running(self, pod: V1Pod):
wait=tenacity.wait_exponential(),
reraise=True
)
def read_pod_logs(self, pod: V1Pod, tail_lines: int = 10):
def read_pod_logs(self,
pod: V1Pod,
tail_lines: Optional[int] = None,
timestamps: bool = False,
since_seconds: Optional[int] = None):
"""Reads log from the POD"""
additional_kwargs = {}
if since_seconds:
additional_kwargs['since_seconds'] = since_seconds

if tail_lines:
additional_kwargs['tail_lines'] = tail_lines

try:
return self._client.read_namespaced_pod_log(
name=pod.metadata.name,
namespace=pod.metadata.namespace,
container='base',
follow=True,
tail_lines=tail_lines,
_preload_content=False
timestamps=timestamps,
_preload_content=False,
**additional_kwargs
)
except BaseHTTPError as e:
raise AirflowException(
Expand Down
2 changes: 1 addition & 1 deletion kubernetes_tests/test_kubernetes_pod_operator.py
Original file line number Diff line number Diff line change
Expand Up @@ -404,7 +404,7 @@ def test_volume_mount(self):
)
context = create_context(k)
k.execute(context=context)
mock_logger.info.assert_any_call(b"retrieved from mount\n")
mock_logger.info.assert_any_call('retrieved from mount')
actual_pod = self.api_client.sanitize_for_serialization(k.pod)
self.expected_pod['spec']['containers'][0]['args'] = args
self.expected_pod['spec']['containers'][0]['volumeMounts'] = [{
Expand Down
42 changes: 37 additions & 5 deletions tests/kubernetes/test_pod_launcher.py
Original file line number Diff line number Diff line change
Expand Up @@ -48,17 +48,17 @@ def test_read_pod_logs_retries_successfully(self):
_preload_content=False,
container='base',
follow=True,
timestamps=False,
name=mock.sentinel.metadata.name,
namespace=mock.sentinel.metadata.namespace,
tail_lines=10
namespace=mock.sentinel.metadata.namespace
),
mock.call(
_preload_content=False,
container='base',
follow=True,
timestamps=False,
name=mock.sentinel.metadata.name,
namespace=mock.sentinel.metadata.namespace,
tail_lines=10
namespace=mock.sentinel.metadata.namespace
)
])

Expand All @@ -80,19 +80,39 @@ def test_read_pod_logs_successfully_with_tail_lines(self):
self.mock_kube_client.read_namespaced_pod_log.side_effect = [
mock.sentinel.logs
]
logs = self.pod_launcher.read_pod_logs(mock.sentinel, 100)
logs = self.pod_launcher.read_pod_logs(mock.sentinel, tail_lines=100)
self.assertEqual(mock.sentinel.logs, logs)
self.mock_kube_client.read_namespaced_pod_log.assert_has_calls([
mock.call(
_preload_content=False,
container='base',
follow=True,
timestamps=False,
name=mock.sentinel.metadata.name,
namespace=mock.sentinel.metadata.namespace,
tail_lines=100
),
])

def test_read_pod_logs_successfully_with_since_seconds(self):
mock.sentinel.metadata = mock.MagicMock()
self.mock_kube_client.read_namespaced_pod_log.side_effect = [
mock.sentinel.logs
]
logs = self.pod_launcher.read_pod_logs(mock.sentinel, since_seconds=2)
self.assertEqual(mock.sentinel.logs, logs)
self.mock_kube_client.read_namespaced_pod_log.assert_has_calls([
mock.call(
_preload_content=False,
container='base',
follow=True,
timestamps=False,
name=mock.sentinel.metadata.name,
namespace=mock.sentinel.metadata.namespace,
since_seconds=2
),
])

def test_read_pod_events_successfully_returns_events(self):
mock.sentinel.metadata = mock.MagicMock()
self.mock_kube_client.list_namespaced_event.return_value = mock.sentinel.events
Expand Down Expand Up @@ -162,3 +182,15 @@ def test_read_pod_retries_fails(self):
self.pod_launcher.read_pod,
mock.sentinel
)

def test_parse_log_line(self):
timestamp, message = \
self.pod_launcher.parse_log_line('2020-10-08T14:16:17.793417674Z Valid message\n')

self.assertEqual(timestamp, '2020-10-08T14:16:17.793417674Z')
self.assertEqual(message, 'Valid message')

self.assertRaises(
Exception,
self.pod_launcher.parse_log_line('2020-10-08T14:16:17.793417674ZInvalid message\n'),
)
点击 这是indexloc提供的php浏览器服务,不要输入任何密码和下载